You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by hv...@apache.org on 2017/10/12 12:56:09 UTC

spark git commit: [SPARK-21907][CORE][BACKPORT 2.2] oom during spill

Repository: spark
Updated Branches:
  refs/heads/branch-2.2 c5889b59d -> cd51e2c32


[SPARK-21907][CORE][BACKPORT 2.2] oom during spill

back-port #19181 to branch-2.2.

## What changes were proposed in this pull request?
1. a test reproducing [SPARK-21907](https://issues.apache.org/jira/browse/SPARK-21907)
2. a fix for the root cause of the issue.

`org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.spill` calls `org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter.reset` which may trigger another spill,
when this happens the `array` member is already de-allocated but still referenced by the code, this causes the nested spill to fail with an NPE in `org.apache.spark.memory.TaskMemoryManager.getPage`.
This patch introduces a reproduction in a test case and a fix, the fix simply sets the in-mem sorter's array member to an empty array before actually performing the allocation. This prevents the spilling code from 'touching' the de-allocated array.

## How was this patch tested?
introduced a new test case: `org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorterSuite#testOOMDuringSpill`.

Author: Eyal Farago <ey...@nrgene.com>

Closes #19481 from eyalfa/SPARK-21907__oom_during_spill__BACKPORT-2.2.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/cd51e2c3
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/cd51e2c3
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/cd51e2c3

Branch: refs/heads/branch-2.2
Commit: cd51e2c321d793ac714566efb30f8957f9c5e117
Parents: c5889b5
Author: Eyal Farago <ey...@nrgene.com>
Authored: Thu Oct 12 14:56:04 2017 +0200
Committer: Herman van Hovell <hv...@databricks.com>
Committed: Thu Oct 12 14:56:04 2017 +0200

----------------------------------------------------------------------
 .../unsafe/sort/UnsafeExternalSorter.java       |  4 ++
 .../unsafe/sort/UnsafeInMemorySorter.java       | 12 ++++-
 .../unsafe/sort/UnsafeExternalSorterSuite.java  | 32 ++++++++++++++
 .../unsafe/sort/UnsafeInMemorySorterSuite.java  | 46 ++++++++++++++++++++
 .../apache/spark/memory/TestMemoryManager.scala | 12 +++--
 5 files changed, 101 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/cd51e2c3/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
index f312fa2..e1ba588 100644
--- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
+++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
@@ -473,6 +473,10 @@ public final class UnsafeExternalSorter extends MemoryConsumer {
     }
   }
 
+  @VisibleForTesting boolean hasSpaceForAnotherRecord() {
+    return inMemSorter.hasSpaceForAnotherRecord();
+  }
+
   /**
    * An UnsafeSorterIterator that support spilling.
    */

http://git-wip-us.apache.org/repos/asf/spark/blob/cd51e2c3/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java
index c14c126..869ec90 100644
--- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java
+++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java
@@ -162,7 +162,9 @@ public final class UnsafeInMemorySorter {
    */
   public void free() {
     if (consumer != null) {
-      consumer.freeArray(array);
+      if (array != null) {
+        consumer.freeArray(array);
+      }
       array = null;
     }
   }
@@ -170,6 +172,14 @@ public final class UnsafeInMemorySorter {
   public void reset() {
     if (consumer != null) {
       consumer.freeArray(array);
+      // the call to consumer.allocateArray may trigger a spill
+      // which in turn access this instance and eventually re-enter this method and try to free the array again.
+      // by setting the array to null and its length to 0 we effectively make the spill code-path a no-op.
+      // setting the array to null also indicates that it has already been de-allocated which prevents a double de-allocation in free().
+      array = null;
+      usableCapacity = 0;
+      pos = 0;
+      nullBoundaryPos = 0;
       array = consumer.allocateArray(initialSize);
       usableCapacity = getUsableCapacity();
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/cd51e2c3/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java
index 771d390..8d847da 100644
--- a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java
+++ b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java
@@ -23,6 +23,7 @@ import java.util.Arrays;
 import java.util.LinkedList;
 import java.util.UUID;
 
+import org.hamcrest.Matchers;
 import scala.Tuple2$;
 
 import org.junit.After;
@@ -454,5 +455,36 @@ public class UnsafeExternalSorterSuite {
     }
   }
 
+  @Test
+  public void testOOMDuringSpill() throws Exception {
+    final UnsafeExternalSorter sorter = newSorter();
+    // we assume that given default configuration,
+    // the size of the data we insert to the sorter (ints)
+    // and assuming we shouldn't spill before pointers array is exhausted
+    // (memory manager is not configured to throw at this point)
+    // - so this loop runs a reasonable number of iterations (<2000).
+    // test indeed completed within <30ms (on a quad i7 laptop).
+    for (int i = 0; sorter.hasSpaceForAnotherRecord(); ++i) {
+      insertNumber(sorter, i);
+    }
+    // we expect the next insert to attempt growing the pointerssArray
+    // first allocation is expected to fail, then a spill is triggered which attempts another allocation
+    // which also fails and we expect to see this OOM here.
+    // the original code messed with a released array within the spill code
+    // and ended up with a failed assertion.
+    // we also expect the location of the OOM to be org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter.reset
+    memoryManager.markconsequentOOM(2);
+    try {
+      insertNumber(sorter, 1024);
+      fail("expected OutOfMmoryError but it seems operation surprisingly succeeded");
+    }
+    // we expect an OutOfMemoryError here, anything else (i.e the original NPE is a failure)
+    catch (OutOfMemoryError oom){
+      String oomStackTrace = Utils.exceptionString(oom);
+      assertThat("expected OutOfMemoryError in org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter.reset",
+              oomStackTrace,
+              Matchers.containsString("org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter.reset"));
+    }
+  }
 }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/cd51e2c3/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorterSuite.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorterSuite.java b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorterSuite.java
index bd89085..1a3e11e 100644
--- a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorterSuite.java
+++ b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorterSuite.java
@@ -35,6 +35,7 @@ import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.greaterThanOrEqualTo;
 import static org.hamcrest.Matchers.isIn;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
 import static org.mockito.Mockito.mock;
 
 public class UnsafeInMemorySorterSuite {
@@ -139,4 +140,49 @@ public class UnsafeInMemorySorterSuite {
     }
     assertEquals(dataToSort.length, iterLength);
   }
+
+  @Test
+  public void freeAfterOOM() {
+    final SparkConf sparkConf = new SparkConf();
+    sparkConf.set("spark.memory.offHeap.enabled", "false");
+
+    final TestMemoryManager testMemoryManager =
+            new TestMemoryManager(sparkConf);
+    final TaskMemoryManager memoryManager = new TaskMemoryManager(
+            testMemoryManager, 0);
+    final TestMemoryConsumer consumer = new TestMemoryConsumer(memoryManager);
+    final MemoryBlock dataPage = memoryManager.allocatePage(2048, consumer);
+    final Object baseObject = dataPage.getBaseObject();
+    // Write the records into the data page:
+    long position = dataPage.getBaseOffset();
+
+    final HashPartitioner hashPartitioner = new HashPartitioner(4);
+    // Use integer comparison for comparing prefixes (which are partition ids, in this case)
+    final PrefixComparator prefixComparator = PrefixComparators.LONG;
+    final RecordComparator recordComparator = new RecordComparator() {
+      @Override
+      public int compare(
+              Object leftBaseObject,
+              long leftBaseOffset,
+              Object rightBaseObject,
+              long rightBaseOffset) {
+        return 0;
+      }
+    };
+    UnsafeInMemorySorter sorter = new UnsafeInMemorySorter(consumer, memoryManager,
+            recordComparator, prefixComparator, 100, shouldUseRadixSort());
+
+    testMemoryManager.markExecutionAsOutOfMemoryOnce();
+    try {
+      sorter.reset();
+      fail("expected OutOfMmoryError but it seems operation surprisingly succeeded");
+    } catch (OutOfMemoryError oom) {
+      // as expected
+    }
+    // [SPARK-21907] this failed on NPE at org.apache.spark.memory.MemoryConsumer.freeArray(MemoryConsumer.java:108)
+    sorter.free();
+    // simulate a 'back to back' free.
+    sorter.free();
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/cd51e2c3/core/src/test/scala/org/apache/spark/memory/TestMemoryManager.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/memory/TestMemoryManager.scala b/core/src/test/scala/org/apache/spark/memory/TestMemoryManager.scala
index 5f699df..c26945f 100644
--- a/core/src/test/scala/org/apache/spark/memory/TestMemoryManager.scala
+++ b/core/src/test/scala/org/apache/spark/memory/TestMemoryManager.scala
@@ -27,8 +27,8 @@ class TestMemoryManager(conf: SparkConf)
       numBytes: Long,
       taskAttemptId: Long,
       memoryMode: MemoryMode): Long = {
-    if (oomOnce) {
-      oomOnce = false
+    if (consequentOOM > 0) {
+      consequentOOM -= 1
       0
     } else if (available >= numBytes) {
       available -= numBytes
@@ -58,11 +58,15 @@ class TestMemoryManager(conf: SparkConf)
 
   override def maxOffHeapStorageMemory: Long = 0L
 
-  private var oomOnce = false
+  private var consequentOOM = 0
   private var available = Long.MaxValue
 
   def markExecutionAsOutOfMemoryOnce(): Unit = {
-    oomOnce = true
+    markconsequentOOM(1)
+  }
+
+  def markconsequentOOM(n : Int) : Unit = {
+    consequentOOM += n
   }
 
   def limit(avail: Long): Unit = {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org