You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by hv...@apache.org on 2017/10/12 12:56:09 UTC
spark git commit: [SPARK-21907][CORE][BACKPORT 2.2] oom during spill
Repository: spark
Updated Branches:
refs/heads/branch-2.2 c5889b59d -> cd51e2c32
[SPARK-21907][CORE][BACKPORT 2.2] oom during spill
back-port #19181 to branch-2.2.
## What changes were proposed in this pull request?
1. a test reproducing [SPARK-21907](https://issues.apache.org/jira/browse/SPARK-21907)
2. a fix for the root cause of the issue.
`org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.spill` calls `org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter.reset` which may trigger another spill,
when this happens the `array` member is already de-allocated but still referenced by the code, this causes the nested spill to fail with an NPE in `org.apache.spark.memory.TaskMemoryManager.getPage`.
This patch introduces a reproduction in a test case and a fix, the fix simply sets the in-mem sorter's array member to an empty array before actually performing the allocation. This prevents the spilling code from 'touching' the de-allocated array.
## How was this patch tested?
introduced a new test case: `org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorterSuite#testOOMDuringSpill`.
Author: Eyal Farago <ey...@nrgene.com>
Closes #19481 from eyalfa/SPARK-21907__oom_during_spill__BACKPORT-2.2.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/cd51e2c3
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/cd51e2c3
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/cd51e2c3
Branch: refs/heads/branch-2.2
Commit: cd51e2c321d793ac714566efb30f8957f9c5e117
Parents: c5889b5
Author: Eyal Farago <ey...@nrgene.com>
Authored: Thu Oct 12 14:56:04 2017 +0200
Committer: Herman van Hovell <hv...@databricks.com>
Committed: Thu Oct 12 14:56:04 2017 +0200
----------------------------------------------------------------------
.../unsafe/sort/UnsafeExternalSorter.java | 4 ++
.../unsafe/sort/UnsafeInMemorySorter.java | 12 ++++-
.../unsafe/sort/UnsafeExternalSorterSuite.java | 32 ++++++++++++++
.../unsafe/sort/UnsafeInMemorySorterSuite.java | 46 ++++++++++++++++++++
.../apache/spark/memory/TestMemoryManager.scala | 12 +++--
5 files changed, 101 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/cd51e2c3/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
index f312fa2..e1ba588 100644
--- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
+++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
@@ -473,6 +473,10 @@ public final class UnsafeExternalSorter extends MemoryConsumer {
}
}
+ @VisibleForTesting boolean hasSpaceForAnotherRecord() {
+ return inMemSorter.hasSpaceForAnotherRecord();
+ }
+
/**
* An UnsafeSorterIterator that support spilling.
*/
http://git-wip-us.apache.org/repos/asf/spark/blob/cd51e2c3/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java
index c14c126..869ec90 100644
--- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java
+++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java
@@ -162,7 +162,9 @@ public final class UnsafeInMemorySorter {
*/
public void free() {
if (consumer != null) {
- consumer.freeArray(array);
+ if (array != null) {
+ consumer.freeArray(array);
+ }
array = null;
}
}
@@ -170,6 +172,14 @@ public final class UnsafeInMemorySorter {
public void reset() {
if (consumer != null) {
consumer.freeArray(array);
+ // the call to consumer.allocateArray may trigger a spill
+ // which in turn access this instance and eventually re-enter this method and try to free the array again.
+ // by setting the array to null and its length to 0 we effectively make the spill code-path a no-op.
+ // setting the array to null also indicates that it has already been de-allocated which prevents a double de-allocation in free().
+ array = null;
+ usableCapacity = 0;
+ pos = 0;
+ nullBoundaryPos = 0;
array = consumer.allocateArray(initialSize);
usableCapacity = getUsableCapacity();
}
http://git-wip-us.apache.org/repos/asf/spark/blob/cd51e2c3/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java
index 771d390..8d847da 100644
--- a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java
+++ b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java
@@ -23,6 +23,7 @@ import java.util.Arrays;
import java.util.LinkedList;
import java.util.UUID;
+import org.hamcrest.Matchers;
import scala.Tuple2$;
import org.junit.After;
@@ -454,5 +455,36 @@ public class UnsafeExternalSorterSuite {
}
}
+ @Test
+ public void testOOMDuringSpill() throws Exception {
+ final UnsafeExternalSorter sorter = newSorter();
+ // we assume that given default configuration,
+ // the size of the data we insert to the sorter (ints)
+ // and assuming we shouldn't spill before pointers array is exhausted
+ // (memory manager is not configured to throw at this point)
+ // - so this loop runs a reasonable number of iterations (<2000).
+ // test indeed completed within <30ms (on a quad i7 laptop).
+ for (int i = 0; sorter.hasSpaceForAnotherRecord(); ++i) {
+ insertNumber(sorter, i);
+ }
+ // we expect the next insert to attempt growing the pointerssArray
+ // first allocation is expected to fail, then a spill is triggered which attempts another allocation
+ // which also fails and we expect to see this OOM here.
+ // the original code messed with a released array within the spill code
+ // and ended up with a failed assertion.
+ // we also expect the location of the OOM to be org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter.reset
+ memoryManager.markconsequentOOM(2);
+ try {
+ insertNumber(sorter, 1024);
+ fail("expected OutOfMmoryError but it seems operation surprisingly succeeded");
+ }
+ // we expect an OutOfMemoryError here, anything else (i.e the original NPE is a failure)
+ catch (OutOfMemoryError oom){
+ String oomStackTrace = Utils.exceptionString(oom);
+ assertThat("expected OutOfMemoryError in org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter.reset",
+ oomStackTrace,
+ Matchers.containsString("org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter.reset"));
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/spark/blob/cd51e2c3/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorterSuite.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorterSuite.java b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorterSuite.java
index bd89085..1a3e11e 100644
--- a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorterSuite.java
+++ b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorterSuite.java
@@ -35,6 +35,7 @@ import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.isIn;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;
public class UnsafeInMemorySorterSuite {
@@ -139,4 +140,49 @@ public class UnsafeInMemorySorterSuite {
}
assertEquals(dataToSort.length, iterLength);
}
+
+ @Test
+ public void freeAfterOOM() {
+ final SparkConf sparkConf = new SparkConf();
+ sparkConf.set("spark.memory.offHeap.enabled", "false");
+
+ final TestMemoryManager testMemoryManager =
+ new TestMemoryManager(sparkConf);
+ final TaskMemoryManager memoryManager = new TaskMemoryManager(
+ testMemoryManager, 0);
+ final TestMemoryConsumer consumer = new TestMemoryConsumer(memoryManager);
+ final MemoryBlock dataPage = memoryManager.allocatePage(2048, consumer);
+ final Object baseObject = dataPage.getBaseObject();
+ // Write the records into the data page:
+ long position = dataPage.getBaseOffset();
+
+ final HashPartitioner hashPartitioner = new HashPartitioner(4);
+ // Use integer comparison for comparing prefixes (which are partition ids, in this case)
+ final PrefixComparator prefixComparator = PrefixComparators.LONG;
+ final RecordComparator recordComparator = new RecordComparator() {
+ @Override
+ public int compare(
+ Object leftBaseObject,
+ long leftBaseOffset,
+ Object rightBaseObject,
+ long rightBaseOffset) {
+ return 0;
+ }
+ };
+ UnsafeInMemorySorter sorter = new UnsafeInMemorySorter(consumer, memoryManager,
+ recordComparator, prefixComparator, 100, shouldUseRadixSort());
+
+ testMemoryManager.markExecutionAsOutOfMemoryOnce();
+ try {
+ sorter.reset();
+ fail("expected OutOfMmoryError but it seems operation surprisingly succeeded");
+ } catch (OutOfMemoryError oom) {
+ // as expected
+ }
+ // [SPARK-21907] this failed on NPE at org.apache.spark.memory.MemoryConsumer.freeArray(MemoryConsumer.java:108)
+ sorter.free();
+ // simulate a 'back to back' free.
+ sorter.free();
+ }
+
}
http://git-wip-us.apache.org/repos/asf/spark/blob/cd51e2c3/core/src/test/scala/org/apache/spark/memory/TestMemoryManager.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/memory/TestMemoryManager.scala b/core/src/test/scala/org/apache/spark/memory/TestMemoryManager.scala
index 5f699df..c26945f 100644
--- a/core/src/test/scala/org/apache/spark/memory/TestMemoryManager.scala
+++ b/core/src/test/scala/org/apache/spark/memory/TestMemoryManager.scala
@@ -27,8 +27,8 @@ class TestMemoryManager(conf: SparkConf)
numBytes: Long,
taskAttemptId: Long,
memoryMode: MemoryMode): Long = {
- if (oomOnce) {
- oomOnce = false
+ if (consequentOOM > 0) {
+ consequentOOM -= 1
0
} else if (available >= numBytes) {
available -= numBytes
@@ -58,11 +58,15 @@ class TestMemoryManager(conf: SparkConf)
override def maxOffHeapStorageMemory: Long = 0L
- private var oomOnce = false
+ private var consequentOOM = 0
private var available = Long.MaxValue
def markExecutionAsOutOfMemoryOnce(): Unit = {
- oomOnce = true
+ markconsequentOOM(1)
+ }
+
+ def markconsequentOOM(n : Int) : Unit = {
+ consequentOOM += n
}
def limit(avail: Long): Unit = {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org