You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2018/02/11 16:03:55 UTC

spark git commit: [SPARK-23376][SQL] creating UnsafeKVExternalSorter with BytesToBytesMap may fail

Repository: spark
Updated Branches:
  refs/heads/master eacb62fbb -> 4bbd7443e


[SPARK-23376][SQL] creating UnsafeKVExternalSorter with BytesToBytesMap may fail

## What changes were proposed in this pull request?

This is a long-standing bug in `UnsafeKVExternalSorter` and was reported in the dev list multiple times.

When creating `UnsafeKVExternalSorter` with `BytesToBytesMap`, we need to create a `UnsafeInMemorySorter` to sort the data in `BytesToBytesMap`. The data format of the sorter and the map is same, so no data movement is required. However, both the sorter and the map need a point array for some bookkeeping work.

There is an optimization in `UnsafeKVExternalSorter`: reuse the point array between the sorter and the map, to avoid an extra memory allocation. This sounds like a reasonable optimization, the length of the `BytesToBytesMap` point array is at least 4 times larger than the number of keys(to avoid hash collision, the hash table size should be at least 2 times larger than the number of keys, and each key occupies 2 slots). `UnsafeInMemorySorter` needs the pointer array size to be 4 times of the number of entries, so we are safe to reuse the point array.

However, the number of keys of the map doesn't equal to the number of entries in the map, because `BytesToBytesMap` supports duplicated keys. This breaks the assumption of the above optimization and we may run out of space when inserting data into the sorter, and hit error
```
java.lang.IllegalStateException: There is no space for new record
   at org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter.insertRecord(UnsafeInMemorySorter.java:239)
   at org.apache.spark.sql.execution.UnsafeKVExternalSorter.<init>(UnsafeKVExternalSorter.java:149)
...
```

This PR fixes this bug by creating a new point array if the existing one is not big enough.

## How was this patch tested?

a new test

Author: Wenchen Fan <we...@databricks.com>

Closes #20561 from cloud-fan/bug.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4bbd7443
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4bbd7443
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4bbd7443

Branch: refs/heads/master
Commit: 4bbd7443ebb005f81ed6bc39849940ac8db3b3cc
Parents: eacb62f
Author: Wenchen Fan <we...@databricks.com>
Authored: Mon Feb 12 00:03:49 2018 +0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Mon Feb 12 00:03:49 2018 +0800

----------------------------------------------------------------------
 .../sql/execution/UnsafeKVExternalSorter.java   | 31 ++++++++++++----
 .../execution/UnsafeKVExternalSorterSuite.scala | 39 ++++++++++++++++++++
 2 files changed, 62 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/4bbd7443/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java b/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java
index b0b5383..9eb0343 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java
@@ -34,6 +34,7 @@ import org.apache.spark.sql.types.StructType;
 import org.apache.spark.storage.BlockManager;
 import org.apache.spark.unsafe.KVIterator;
 import org.apache.spark.unsafe.Platform;
+import org.apache.spark.unsafe.array.LongArray;
 import org.apache.spark.unsafe.map.BytesToBytesMap;
 import org.apache.spark.unsafe.memory.MemoryBlock;
 import org.apache.spark.util.collection.unsafe.sort.*;
@@ -98,19 +99,33 @@ public final class UnsafeKVExternalSorter {
         numElementsForSpillThreshold,
         canUseRadixSort);
     } else {
-      // The array will be used to do in-place sort, which require half of the space to be empty.
-      // Note: each record in the map takes two entries in the array, one is record pointer,
-      // another is the key prefix.
-      assert(map.numKeys() * 2 <= map.getArray().size() / 2);
-      // During spilling, the array in map will not be used, so we can borrow that and use it
-      // as the underlying array for in-memory sorter (it's always large enough).
-      // Since we will not grow the array, it's fine to pass `null` as consumer.
+      // During spilling, the pointer array in `BytesToBytesMap` will not be used, so we can borrow
+      // that and use it as the pointer array for `UnsafeInMemorySorter`.
+      LongArray pointerArray = map.getArray();
+      // `BytesToBytesMap`'s pointer array is only guaranteed to hold all the distinct keys, but
+      // `UnsafeInMemorySorter`'s pointer array need to hold all the entries. Since
+      // `BytesToBytesMap` can have duplicated keys, here we need a check to make sure the pointer
+      // array can hold all the entries in `BytesToBytesMap`.
+      // The pointer array will be used to do in-place sort, which requires half of the space to be
+      // empty. Note: each record in the map takes two entries in the pointer array, one is record
+      // pointer, another is key prefix. So the required size of pointer array is `numRecords * 4`.
+      // TODO: It's possible to change UnsafeInMemorySorter to have multiple entries with same key,
+      // so that we can always reuse the pointer array.
+      if (map.numValues() > pointerArray.size() / 4) {
+        // Here we ask the map to allocate memory, so that the memory manager won't ask the map
+        // to spill, if the memory is not enough.
+        pointerArray = map.allocateArray(map.numValues() * 4L);
+      }
+
+      // Since the pointer array(either reuse the one in the map, or create a new one) is guaranteed
+      // to be large enough, it's fine to pass `null` as consumer because we won't allocate more
+      // memory.
       final UnsafeInMemorySorter inMemSorter = new UnsafeInMemorySorter(
         null,
         taskMemoryManager,
         comparatorSupplier.get(),
         prefixComparator,
-        map.getArray(),
+        pointerArray,
         canUseRadixSort);
 
       // We cannot use the destructive iterator here because we are reusing the existing memory

http://git-wip-us.apache.org/repos/asf/spark/blob/4bbd7443/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeKVExternalSorterSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeKVExternalSorterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeKVExternalSorterSuite.scala
index 6af9f8b..bf588d3 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeKVExternalSorterSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeKVExternalSorterSuite.scala
@@ -29,6 +29,7 @@ import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
 import org.apache.spark.sql.catalyst.expressions.{InterpretedOrdering, UnsafeProjection, UnsafeRow}
 import org.apache.spark.sql.test.SharedSQLContext
 import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.map.BytesToBytesMap
 
 /**
  * Test suite for [[UnsafeKVExternalSorter]], with randomly generated test data.
@@ -205,4 +206,42 @@ class UnsafeKVExternalSorterSuite extends SparkFunSuite with SharedSQLContext {
       spill = true
     )
   }
+
+  test("SPARK-23376: Create UnsafeKVExternalSorter with BytesToByteMap having duplicated keys") {
+    val memoryManager = new TestMemoryManager(new SparkConf())
+    val taskMemoryManager = new TaskMemoryManager(memoryManager, 0)
+    val map = new BytesToBytesMap(taskMemoryManager, 64, taskMemoryManager.pageSizeBytes())
+
+    // Key/value are a unsafe rows with a single int column
+    val schema = new StructType().add("i", IntegerType)
+    val key = new UnsafeRow(1)
+    key.pointTo(new Array[Byte](32), 32)
+    key.setInt(0, 1)
+    val value = new UnsafeRow(1)
+    value.pointTo(new Array[Byte](32), 32)
+    value.setInt(0, 2)
+
+    for (_ <- 1 to 65) {
+      val loc = map.lookup(key.getBaseObject, key.getBaseOffset, key.getSizeInBytes)
+      loc.append(
+        key.getBaseObject, key.getBaseOffset, key.getSizeInBytes,
+        value.getBaseObject, value.getBaseOffset, value.getSizeInBytes)
+    }
+
+    // Make sure we can successfully create a UnsafeKVExternalSorter with a `BytesToBytesMap`
+    // which has duplicated keys and the number of entries exceeds its capacity.
+    try {
+      TaskContext.setTaskContext(new TaskContextImpl(0, 0, 0, 0, 0, taskMemoryManager, null, null))
+      new UnsafeKVExternalSorter(
+        schema,
+        schema,
+        sparkContext.env.blockManager,
+        sparkContext.env.serializerManager,
+        taskMemoryManager.pageSizeBytes(),
+        Int.MaxValue,
+        map)
+    } finally {
+      TaskContext.unset()
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org