You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by jo...@apache.org on 2015/06/14 18:35:13 UTC

spark git commit: [SPARK-8354] [SQL] Fix off-by-factor-of-8 error when allocating scratch space in UnsafeFixedWidthAggregationMap

Repository: spark
Updated Branches:
  refs/heads/master cb7ada119 -> ea7fd2ff6


[SPARK-8354] [SQL] Fix off-by-factor-of-8 error when allocating scratch space in UnsafeFixedWidthAggregationMap

UnsafeFixedWidthAggregationMap contains an off-by-factor-of-8 error when allocating row conversion scratch space: we take a size requirement, measured in bytes, then allocate a long array of that size.  This means that we end up allocating 8x too much conversion space.

This patch fixes this by allocating a `byte[]` array instead.  This doesn't impose any new limitations on the maximum sizes of UnsafeRows, since UnsafeRowConverter already used integers when calculating the size requirements for rows.

Author: Josh Rosen <jo...@databricks.com>

Closes #6809 from JoshRosen/sql-bytes-vs-words-fix and squashes the following commits:

6520339 [Josh Rosen] Updates to reflect fact that UnsafeRow max size is constrained by max byte[] size


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ea7fd2ff
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ea7fd2ff
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ea7fd2ff

Branch: refs/heads/master
Commit: ea7fd2ff6454e8d819a39bf49901074e49b5714e
Parents: cb7ada1
Author: Josh Rosen <jo...@databricks.com>
Authored: Sun Jun 14 09:34:35 2015 -0700
Committer: Josh Rosen <jo...@databricks.com>
Committed: Sun Jun 14 09:34:35 2015 -0700

----------------------------------------------------------------------
 .../UnsafeFixedWidthAggregationMap.java         | 30 ++++++++++----------
 .../expressions/UnsafeRowConverter.scala        |  2 +-
 2 files changed, 16 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/ea7fd2ff/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeFixedWidthAggregationMap.java
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeFixedWidthAggregationMap.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeFixedWidthAggregationMap.java
index b23e0ef..f7849eb 100644
--- a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeFixedWidthAggregationMap.java
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeFixedWidthAggregationMap.java
@@ -39,7 +39,7 @@ public final class UnsafeFixedWidthAggregationMap {
    * An empty aggregation buffer, encoded in UnsafeRow format. When inserting a new key into the
    * map, we copy this buffer and use it as the value.
    */
-  private final long[] emptyAggregationBuffer;
+  private final byte[] emptyAggregationBuffer;
 
   private final StructType aggregationBufferSchema;
 
@@ -63,10 +63,10 @@ public final class UnsafeFixedWidthAggregationMap {
   /**
    * Scratch space that is used when encoding grouping keys into UnsafeRow format.
    *
-   * By default, this is a 1MB array, but it will grow as necessary in case larger keys are
+   * By default, this is a 8 kb array, but it will grow as necessary in case larger keys are
    * encountered.
    */
-  private long[] groupingKeyConversionScratchSpace = new long[1024 / 8];
+  private byte[] groupingKeyConversionScratchSpace = new byte[1024 * 8];
 
   private final boolean enablePerfMetrics;
 
@@ -123,13 +123,13 @@ public final class UnsafeFixedWidthAggregationMap {
   }
 
   /**
-   * Convert a Java object row into an UnsafeRow, allocating it into a new long array.
+   * Convert a Java object row into an UnsafeRow, allocating it into a new byte array.
    */
-  private static long[] convertToUnsafeRow(InternalRow javaRow, StructType schema) {
+  private static byte[] convertToUnsafeRow(InternalRow javaRow, StructType schema) {
     final UnsafeRowConverter converter = new UnsafeRowConverter(schema);
-    final long[] unsafeRow = new long[converter.getSizeRequirement(javaRow)];
-    final long writtenLength =
-      converter.writeRow(javaRow, unsafeRow, PlatformDependent.LONG_ARRAY_OFFSET);
+    final byte[] unsafeRow = new byte[converter.getSizeRequirement(javaRow)];
+    final int writtenLength =
+      converter.writeRow(javaRow, unsafeRow, PlatformDependent.BYTE_ARRAY_OFFSET);
     assert (writtenLength == unsafeRow.length): "Size requirement calculation was wrong!";
     return unsafeRow;
   }
@@ -143,34 +143,34 @@ public final class UnsafeFixedWidthAggregationMap {
     // Make sure that the buffer is large enough to hold the key. If it's not, grow it:
     if (groupingKeySize > groupingKeyConversionScratchSpace.length) {
       // This new array will be initially zero, so there's no need to zero it out here
-      groupingKeyConversionScratchSpace = new long[groupingKeySize];
+      groupingKeyConversionScratchSpace = new byte[groupingKeySize];
     } else {
       // Zero out the buffer that's used to hold the current row. This is necessary in order
       // to ensure that rows hash properly, since garbage data from the previous row could
       // otherwise end up as padding in this row. As a performance optimization, we only zero out
       // the portion of the buffer that we'll actually write to.
-      Arrays.fill(groupingKeyConversionScratchSpace, 0, groupingKeySize, 0);
+      Arrays.fill(groupingKeyConversionScratchSpace, 0, groupingKeySize, (byte) 0);
     }
-    final long actualGroupingKeySize = groupingKeyToUnsafeRowConverter.writeRow(
+    final int actualGroupingKeySize = groupingKeyToUnsafeRowConverter.writeRow(
       groupingKey,
       groupingKeyConversionScratchSpace,
-      PlatformDependent.LONG_ARRAY_OFFSET);
+      PlatformDependent.BYTE_ARRAY_OFFSET);
     assert (groupingKeySize == actualGroupingKeySize) : "Size requirement calculation was wrong!";
 
     // Probe our map using the serialized key
     final BytesToBytesMap.Location loc = map.lookup(
       groupingKeyConversionScratchSpace,
-      PlatformDependent.LONG_ARRAY_OFFSET,
+      PlatformDependent.BYTE_ARRAY_OFFSET,
       groupingKeySize);
     if (!loc.isDefined()) {
       // This is the first time that we've seen this grouping key, so we'll insert a copy of the
       // empty aggregation buffer into the map:
       loc.putNewKey(
         groupingKeyConversionScratchSpace,
-        PlatformDependent.LONG_ARRAY_OFFSET,
+        PlatformDependent.BYTE_ARRAY_OFFSET,
         groupingKeySize,
         emptyAggregationBuffer,
-        PlatformDependent.LONG_ARRAY_OFFSET,
+        PlatformDependent.BYTE_ARRAY_OFFSET,
         emptyAggregationBuffer.length
       );
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/ea7fd2ff/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/UnsafeRowConverter.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/UnsafeRowConverter.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/UnsafeRowConverter.scala
index d771e45..5c92f41 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/UnsafeRowConverter.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/UnsafeRowConverter.scala
@@ -68,7 +68,7 @@ class UnsafeRowConverter(fieldTypes: Array[DataType]) {
    * @param baseOffset the base offset of the destination address
    * @return the number of bytes written. This should be equal to `getSizeRequirement(row)`.
    */
-  def writeRow(row: InternalRow, baseObject: Object, baseOffset: Long): Long = {
+  def writeRow(row: InternalRow, baseObject: Object, baseOffset: Long): Int = {
     unsafeRow.pointTo(baseObject, baseOffset, writers.length, null)
     var fieldNumber = 0
     var appendCursor: Int = fixedLengthSize


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org