You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2015/09/30 08:30:32 UTC

spark git commit: [SPARK-10811] [SQL] Eliminates unnecessary byte array copying

Repository: spark
Updated Branches:
  refs/heads/master c1ad373f2 -> 4d5a005b0


[SPARK-10811] [SQL] Eliminates unnecessary byte array copying

When reading Parquet string and binary-backed decimal values, Parquet `Binary.getBytes` always returns a copied byte array, which is unnecessary. Since the underlying implementation of `Binary` values there is guaranteed to be `ByteArraySliceBackedBinary`, and Parquet itself never reuses underlying byte arrays, we can use `Binary.toByteBuffer.array()` to steal the underlying byte arrays without copying them.

This brings performance benefits when scanning Parquet string and binary-backed decimal columns. Note that, this trick doesn't cover binary-backed decimals with precision greater than 18.

My micro-benchmark result is that, this brings a ~15% performance boost for scanning TPC-DS `store_sales` table (scale factor 15).

Another minor optimization done in this PR is that, now we directly construct a Java `BigDecimal` in `Decimal.toJavaBigDecimal` without constructing a Scala `BigDecimal` first. This brings another ~5% performance gain.

Author: Cheng Lian <li...@databricks.com>

Closes #8907 from liancheng/spark-10811/eliminate-array-copying.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4d5a005b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4d5a005b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4d5a005b

Branch: refs/heads/master
Commit: 4d5a005b0d2591d4d57a19be48c4954b9f1434a9
Parents: c1ad373
Author: Cheng Lian <li...@databricks.com>
Authored: Tue Sep 29 23:30:27 2015 -0700
Committer: Cheng Lian <li...@databricks.com>
Committed: Tue Sep 29 23:30:27 2015 -0700

----------------------------------------------------------------------
 .../org/apache/spark/sql/types/Decimal.scala    |  8 +++++-
 .../parquet/CatalystRowConverter.scala          | 26 ++++++++++++++------
 .../parquet/CatalystSchemaConverter.scala       |  4 +--
 3 files changed, 28 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/4d5a005b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Decimal.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Decimal.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Decimal.scala
index c988f1d..bfcf111 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Decimal.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Decimal.scala
@@ -145,7 +145,13 @@ final class Decimal extends Ordered[Decimal] with Serializable {
     }
   }
 
-  def toJavaBigDecimal: java.math.BigDecimal = toBigDecimal.underlying()
+  def toJavaBigDecimal: java.math.BigDecimal = {
+    if (decimalVal.ne(null)) {
+      decimalVal.underlying()
+    } else {
+      java.math.BigDecimal.valueOf(longVal, _scale)
+    }
+  }
 
   def toUnscaledLong: Long = {
     if (decimalVal.ne(null)) {

http://git-wip-us.apache.org/repos/asf/spark/blob/4d5a005b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala
index 2ff2fda..050d361 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala
@@ -302,7 +302,13 @@ private[parquet] class CatalystRowConverter(
     }
 
     override def addBinary(value: Binary): Unit = {
-      updater.set(UTF8String.fromBytes(value.getBytes))
+      // The underlying `ByteBuffer` implementation is guaranteed to be `HeapByteBuffer`, so here we
+      // are using `Binary.toByteBuffer.array()` to steal the underlying byte array without copying
+      // it.
+      val buffer = value.toByteBuffer
+      val offset = buffer.position()
+      val numBytes = buffer.limit() - buffer.position()
+      updater.set(UTF8String.fromBytes(buffer.array(), offset, numBytes))
     }
   }
 
@@ -332,24 +338,30 @@ private[parquet] class CatalystRowConverter(
     private def toDecimal(value: Binary): Decimal = {
       val precision = decimalType.precision
       val scale = decimalType.scale
-      val bytes = value.getBytes
 
       if (precision <= CatalystSchemaConverter.MAX_PRECISION_FOR_INT64) {
-        // Constructs a `Decimal` with an unscaled `Long` value if possible.
+        // Constructs a `Decimal` with an unscaled `Long` value if possible.  The underlying
+        // `ByteBuffer` implementation is guaranteed to be `HeapByteBuffer`, so here we are using
+        // `Binary.toByteBuffer.array()` to steal the underlying byte array without copying it.
+        val buffer = value.toByteBuffer
+        val bytes = buffer.array()
+        val start = buffer.position()
+        val end = buffer.limit()
+
         var unscaled = 0L
-        var i = 0
+        var i = start
 
-        while (i < bytes.length) {
+        while (i < end) {
           unscaled = (unscaled << 8) | (bytes(i) & 0xff)
           i += 1
         }
 
-        val bits = 8 * bytes.length
+        val bits = 8 * (end - start)
         unscaled = (unscaled << (64 - bits)) >> (64 - bits)
         Decimal(unscaled, precision, scale)
       } else {
         // Otherwise, resorts to an unscaled `BigInteger` instead.
-        Decimal(new BigDecimal(new BigInteger(bytes), scale), precision, scale)
+        Decimal(new BigDecimal(new BigInteger(value.getBytes), scale), precision, scale)
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/4d5a005b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala
index 2d237da..97ffeb0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala
@@ -567,9 +567,9 @@ private[parquet] object CatalystSchemaConverter {
     }
   }
 
-  val MAX_PRECISION_FOR_INT32 = maxPrecisionForBytes(4)
+  val MAX_PRECISION_FOR_INT32 = maxPrecisionForBytes(4) /* 9 */
 
-  val MAX_PRECISION_FOR_INT64 = maxPrecisionForBytes(8)
+  val MAX_PRECISION_FOR_INT64 = maxPrecisionForBytes(8) /* 18 */
 
   // Max precision of a decimal value stored in `numBytes` bytes
   def maxPrecisionForBytes(numBytes: Int): Int = {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org