You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by hv...@apache.org on 2020/10/08 08:01:43 UTC
[spark] branch branch-2.4 updated: [SPARK-32901][CORE][2.4] Do not
allocate memory while spilling UnsafeExternalSorter
This is an automated email from the ASF dual-hosted git repository.
hvanhovell pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-2.4 by this push:
new 45a8b89 [SPARK-32901][CORE][2.4] Do not allocate memory while spilling UnsafeExternalSorter
45a8b89 is described below
commit 45a8b892455daaad34d97e37356dee85256d316d
Author: Tom van Bussel <to...@databricks.com>
AuthorDate: Thu Oct 8 09:58:01 2020 +0200
[SPARK-32901][CORE][2.4] Do not allocate memory while spilling UnsafeExternalSorter
Backport of #29785 to Spark 2.4
### What changes were proposed in this pull request?
This PR changes `UnsafeExternalSorter` to no longer allocate any memory while spilling. In particular it removes the allocation of a new pointer array in `UnsafeInMemorySorter`. Instead the new pointer array is allocated whenever the next record is inserted into the sorter.
### Why are the changes needed?
Without this change the `UnsafeExternalSorter` could throw an OOM while spilling. The following sequence of events would have triggered an OOM:
1. `UnsafeExternalSorter` runs out of space in its pointer array and attempts to allocate a new large array to replace the old one.
2. `TaskMemoryManager` tries to allocate the memory backing the new large array using `MemoryManager`, but `MemoryManager` is only willing to return most but not all of the memory requested.
3. `TaskMemoryManager` asks `UnsafeExternalSorter` to spill, which causes `UnsafeExternalSorter` to spill the current run to disk, to free its record pages and to reset its `UnsafeInMemorySorter`.
4. `UnsafeInMemorySorter` frees the old pointer array, and tries to allocate a new small pointer array.
5. `TaskMemoryManager` tries to allocate the memory backing the small array using `MemoryManager`, but `MemoryManager` is unwilling to give it any memory, as the `TaskMemoryManager` is still holding on to the memory it got for the new large array.
6. `TaskMemoryManager` again asks `UnsafeExternalSorter` to spill, but this time there is nothing to spill.
7. `UnsafeInMemorySorter` receives less memory than it requested, and causes a `SparkOutOfMemoryError` to be thrown, which causes the current task to fail.
With the changes in the PR the following will happen instead:
1. `UnsafeExternalSorter` runs out of space in its pointer array and attempts to allocate a new large array to replace the old one.
2. `TaskMemoryManager` tries to allocate the memory backing the new large array using `MemoryManager`, but `MemoryManager` is only willing to return most but not all of the memory requested.
3. `TaskMemoryManager` asks `UnsafeExternalSorter` to spill, which causes `UnsafeExternalSorter` to spill the current run to disk, to free its record pages and to reset its `UnsafeInMemorySorter`.
4. `UnsafeInMemorySorter` frees the old pointer array.
5. `TaskMemoryManager` returns control to `UnsafeExternalSorter.growPointerArrayIfNecessary` (either by returning the the new large array or by throwing a `SparkOutOfMemoryError`).
6. `UnsafeExternalSorter` either frees the new large array or it ignores the `SparkOutOfMemoryError` depending on what happened in the previous step.
7. `UnsafeExternalSorter` successfully allocates a new small pointer array and operation continues as normal.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Tests were added in `UnsafeExternalSorterSuite` and `UnsafeInMemorySorterSuite`.
Closes #29910 from tomvanbussel/backport-SPARK-32901.
Authored-by: Tom van Bussel <to...@databricks.com>
Signed-off-by: herman <he...@databricks.com>
---
.../unsafe/sort/UnsafeExternalSorter.java | 96 ++++++++++++++++------
.../unsafe/sort/UnsafeInMemorySorter.java | 51 ++++++------
.../unsafe/sort/UnsafeExternalSorterSuite.java | 45 ++++------
.../unsafe/sort/UnsafeInMemorySorterSuite.java | 39 ++++-----
.../apache/spark/memory/TestMemoryManager.scala | 12 ++-
5 files changed, 143 insertions(+), 100 deletions(-)
diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
index f720ccd..9552e79 100644
--- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
+++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
@@ -203,6 +203,10 @@ public final class UnsafeExternalSorter extends MemoryConsumer {
}
if (inMemSorter == null || inMemSorter.numRecords() <= 0) {
+ // There could still be some memory allocated when there are no records in the in-memory
+ // sorter. We will not spill it however, to ensure that we can always process at least one
+ // record before spilling. See the comments in `allocateMemoryForRecordIfNecessary` for why
+ // this is necessary.
return 0L;
}
@@ -226,7 +230,7 @@ public final class UnsafeExternalSorter extends MemoryConsumer {
// Note that this is more-or-less going to be a multiple of the page size, so wasted space in
// pages will currently be counted as memory spilled even though that space isn't actually
// written to disk. This also counts the space needed to store the sorter's pointer array.
- inMemSorter.reset();
+ inMemSorter.freeMemory();
// Reset the in-memory sorter's pointer array only after freeing up the memory pages holding the
// records. Otherwise, if the task is over allocated memory, then without freeing the memory
// pages, we might not be able to get memory for the pointer array.
@@ -327,7 +331,7 @@ public final class UnsafeExternalSorter extends MemoryConsumer {
deleteSpillFiles();
freeMemory();
if (inMemSorter != null) {
- inMemSorter.free();
+ inMemSorter.freeMemory();
inMemSorter = null;
}
}
@@ -341,40 +345,53 @@ public final class UnsafeExternalSorter extends MemoryConsumer {
private void growPointerArrayIfNecessary() throws IOException {
assert(inMemSorter != null);
if (!inMemSorter.hasSpaceForAnotherRecord()) {
+ if (inMemSorter.numRecords() <= 0) {
+ // Spilling was triggered just before this method was called. The pointer array was freed
+ // during the spill, so a new pointer array needs to be allocated here.
+ LongArray array = allocateArray(inMemSorter.getInitialSize());
+ inMemSorter.expandPointerArray(array);
+ return;
+ }
+
long used = inMemSorter.getMemoryUsage();
- LongArray array;
+ LongArray array = null;
try {
// could trigger spilling
array = allocateArray(used / 8 * 2);
} catch (TooLargePageException e) {
// The pointer array is too big to fix in a single page, spill.
spill();
- return;
} catch (SparkOutOfMemoryError e) {
- // should have trigger spilling
- if (!inMemSorter.hasSpaceForAnotherRecord()) {
+ if (inMemSorter.numRecords() > 0) {
logger.error("Unable to grow the pointer array");
throw e;
}
- return;
+ // The new array could not be allocated, but that is not an issue as it is longer needed,
+ // as all records were spilled.
}
- // check if spilling is triggered or not
- if (inMemSorter.hasSpaceForAnotherRecord()) {
- freeArray(array);
- } else {
- inMemSorter.expandPointerArray(array);
+
+ if (inMemSorter.numRecords() <= 0) {
+ // Spilling was triggered while trying to allocate the new array.
+ if (array != null) {
+ // We succeeded in allocating the new array, but, since all records were spilled, a
+ // smaller array would also suffice.
+ freeArray(array);
+ }
+ // The pointer array was freed during the spill, so a new pointer array needs to be
+ // allocated here.
+ array = allocateArray(inMemSorter.getInitialSize());
}
+ inMemSorter.expandPointerArray(array);
}
}
/**
- * Allocates more memory in order to insert an additional record. This will request additional
- * memory from the memory manager and spill if the requested memory can not be obtained.
+ * Allocates an additional page in order to insert an additional record. This will request
+ * additional memory from the memory manager and spill if the requested memory can not be
+ * obtained.
*
* @param required the required space in the data page, in bytes, including space for storing
- * the record size. This must be less than or equal to the page size (records
- * that exceed the page size are handled via a different code path which uses
- * special overflow pages).
+ * the record size.
*/
private void acquireNewPageIfNecessary(int required) {
if (currentPage == null ||
@@ -387,6 +404,37 @@ public final class UnsafeExternalSorter extends MemoryConsumer {
}
/**
+ * Allocates more memory in order to insert an additional record. This will request additional
+ * memory from the memory manager and spill if the requested memory can not be obtained.
+ *
+ * @param required the required space in the data page, in bytes, including space for storing
+ * the record size.
+ */
+ private void allocateMemoryForRecordIfNecessary(int required) throws IOException {
+ // Step 1:
+ // Ensure that the pointer array has space for another record. This may cause a spill.
+ growPointerArrayIfNecessary();
+ // Step 2:
+ // Ensure that the last page has space for another record. This may cause a spill.
+ acquireNewPageIfNecessary(required);
+ // Step 3:
+ // The allocation in step 2 could have caused a spill, which would have freed the pointer
+ // array allocated in step 1. Therefore we need to check again whether we have to allocate
+ // a new pointer array.
+ //
+ // If the allocation in this step causes a spill event then it will not cause the page
+ // allocated in the previous step to be freed. The function `spill` only frees memory if at
+ // least one record has been inserted in the in-memory sorter. This will not be the case if
+ // we have spilled in the previous step.
+ //
+ // If we did not spill in the previous step then `growPointerArrayIfNecessary` will be a
+ // no-op that does not allocate any memory, and therefore can't cause a spill event.
+ //
+ // Thus there is no need to call `acquireNewPageIfNecessary` again after this step.
+ growPointerArrayIfNecessary();
+ }
+
+ /**
* Write a record to the sorter.
*/
public void insertRecord(
@@ -400,11 +448,10 @@ public final class UnsafeExternalSorter extends MemoryConsumer {
spill();
}
- growPointerArrayIfNecessary();
- int uaoSize = UnsafeAlignedOffset.getUaoSize();
+ final int uaoSize = UnsafeAlignedOffset.getUaoSize();
// Need 4 or 8 bytes to store the record length.
final int required = length + uaoSize;
- acquireNewPageIfNecessary(required);
+ allocateMemoryForRecordIfNecessary(required);
final Object base = currentPage.getBaseObject();
final long recordAddress = taskMemoryManager.encodePageNumberAndOffset(currentPage, pageCursor);
@@ -427,10 +474,9 @@ public final class UnsafeExternalSorter extends MemoryConsumer {
Object valueBase, long valueOffset, int valueLen, long prefix, boolean prefixIsNull)
throws IOException {
- growPointerArrayIfNecessary();
- int uaoSize = UnsafeAlignedOffset.getUaoSize();
+ final int uaoSize = UnsafeAlignedOffset.getUaoSize();
final int required = keyLen + valueLen + (2 * uaoSize);
- acquireNewPageIfNecessary(required);
+ allocateMemoryForRecordIfNecessary(required);
final Object base = currentPage.getBaseObject();
final long recordAddress = taskMemoryManager.encodePageNumberAndOffset(currentPage, pageCursor);
@@ -565,7 +611,7 @@ public final class UnsafeExternalSorter extends MemoryConsumer {
assert(inMemSorter != null);
released += inMemSorter.getMemoryUsage();
totalSortTimeNanos += inMemSorter.getSortTimeNanos();
- inMemSorter.free();
+ inMemSorter.freeMemory();
inMemSorter = null;
taskContext.taskMetrics().incMemoryBytesSpilled(released);
taskContext.taskMetrics().incDiskBytesSpilled(writeMetrics.bytesWritten());
@@ -660,7 +706,7 @@ public final class UnsafeExternalSorter extends MemoryConsumer {
}
i += spillWriter.recordsSpilled();
}
- if (inMemSorter != null) {
+ if (inMemSorter != null && inMemSorter.numRecords() > 0) {
UnsafeSorterIterator iter = inMemSorter.getSortedIterator();
moveOver(iter, startIndex - i);
queue.add(iter);
diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java
index 9aaa370..5885bb5 100644
--- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java
+++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java
@@ -159,32 +159,26 @@ public final class UnsafeInMemorySorter {
return (int) (array.size() / (radixSortSupport != null ? 2 : 1.5));
}
+ public long getInitialSize() {
+ return initialSize;
+ }
+
/**
* Free the memory used by pointer array.
*/
- public void free() {
+ public void freeMemory() {
if (consumer != null) {
if (array != null) {
consumer.freeArray(array);
}
- array = null;
- }
- }
- public void reset() {
- if (consumer != null) {
- consumer.freeArray(array);
- // the call to consumer.allocateArray may trigger a spill which in turn access this instance
- // and eventually re-enter this method and try to free the array again. by setting the array
- // to null and its length to 0 we effectively make the spill code-path a no-op. setting the
- // array to null also indicates that it has already been de-allocated which prevents a double
- // de-allocation in free().
+ // Set the array to null instead of allocating a new array. Allocating an array could have
+ // triggered another spill and this method already is called from UnsafeExternalSorter when
+ // spilling. Attempting to allocate while spilling is dangerous, as we could be holding onto
+ // a large partially complete allocation, which may prevent other memory from being allocated.
+ // Instead we will allocate the new array when it is necessary.
array = null;
usableCapacity = 0;
- pos = 0;
- nullBoundaryPos = 0;
- array = consumer.allocateArray(initialSize);
- usableCapacity = getUsableCapacity();
}
pos = 0;
nullBoundaryPos = 0;
@@ -217,16 +211,18 @@ public final class UnsafeInMemorySorter {
}
public void expandPointerArray(LongArray newArray) {
- if (newArray.size() < array.size()) {
- throw new SparkOutOfMemoryError("Not enough memory to grow pointer array");
+ if (array != null) {
+ if (newArray.size() < array.size()) {
+ throw new SparkOutOfMemoryError("Not enough memory to grow pointer array");
+ }
+ Platform.copyMemory(
+ array.getBaseObject(),
+ array.getBaseOffset(),
+ newArray.getBaseObject(),
+ newArray.getBaseOffset(),
+ pos * 8L);
+ consumer.freeArray(array);
}
- Platform.copyMemory(
- array.getBaseObject(),
- array.getBaseOffset(),
- newArray.getBaseObject(),
- newArray.getBaseOffset(),
- pos * 8L);
- consumer.freeArray(array);
array = newArray;
usableCapacity = getUsableCapacity();
}
@@ -345,6 +341,11 @@ public final class UnsafeInMemorySorter {
* {@code next()} will return the same mutable object.
*/
public UnsafeSorterIterator getSortedIterator() {
+ if (numRecords() == 0) {
+ // `array` might be null, so make sure that it is not accessed by returning early.
+ return new SortedIterator(0, 0);
+ }
+
int offset = 0;
long start = System.nanoTime();
if (sortComparator != null) {
diff --git a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java
index 41813dd..eefd0c9 100644
--- a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java
+++ b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java
@@ -23,7 +23,6 @@ import java.util.Arrays;
import java.util.LinkedList;
import java.util.UUID;
-import org.hamcrest.Matchers;
import scala.Tuple2$;
import org.junit.After;
@@ -544,40 +543,28 @@ public class UnsafeExternalSorterSuite {
}
@Test
- public void testOOMDuringSpill() throws Exception {
+ public void testNoOOMDuringSpill() throws Exception {
final UnsafeExternalSorter sorter = newSorter();
- // we assume that given default configuration,
- // the size of the data we insert to the sorter (ints)
- // and assuming we shouldn't spill before pointers array is exhausted
- // (memory manager is not configured to throw at this point)
- // - so this loop runs a reasonable number of iterations (<2000).
- // test indeed completed within <30ms (on a quad i7 laptop).
- for (int i = 0; sorter.hasSpaceForAnotherRecord(); ++i) {
+ for (int i = 0; i < 100; i++) {
insertNumber(sorter, i);
}
- // we expect the next insert to attempt growing the pointerssArray first
- // allocation is expected to fail, then a spill is triggered which
- // attempts another allocation which also fails and we expect to see this
- // OOM here. the original code messed with a released array within the
- // spill code and ended up with a failed assertion. we also expect the
- // location of the OOM to be
- // org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter.reset
- memoryManager.markconsequentOOM(2);
- try {
- insertNumber(sorter, 1024);
- fail("expected OutOfMmoryError but it seems operation surprisingly succeeded");
- }
- // we expect an OutOfMemoryError here, anything else (i.e the original NPE is a failure)
- catch (OutOfMemoryError oom){
- String oomStackTrace = Utils.exceptionString(oom);
- assertThat("expected OutOfMemoryError in " +
- "org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter.reset",
- oomStackTrace,
- Matchers.containsString(
- "org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter.reset"));
+
+ // Check that spilling still succeeds when the task is starved for memory.
+ memoryManager.markconsequentOOM(Integer.MAX_VALUE);
+ sorter.spill();
+ memoryManager.resetConsequentOOM();
+
+ // Ensure that records can be appended after spilling, i.e. check that the sorter will allocate
+ // the new pointer array that it could not allocate while spilling.
+ for (int i = 0; i < 100; ++i) {
+ insertNumber(sorter, i);
}
+
+ sorter.cleanupResources();
+ assertSpillFilesWereCleanedUp();
}
+
private void verifyIntIterator(UnsafeSorterIterator iter, int start, int end)
throws IOException {
for (int i = start; i < end; i++) {
diff --git a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorterSuite.java b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorterSuite.java
index 85ffdca4..9ef83ed 100644
--- a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorterSuite.java
+++ b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorterSuite.java
@@ -20,6 +20,7 @@ package org.apache.spark.util.collection.unsafe.sort;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
+import org.apache.spark.unsafe.array.LongArray;
import org.junit.Assert;
import org.junit.Test;
@@ -35,7 +36,6 @@ import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.isIn;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;
public class UnsafeInMemorySorterSuite {
@@ -143,7 +143,7 @@ public class UnsafeInMemorySorterSuite {
}
@Test
- public void freeAfterOOM() {
+ public void testNoOOMDuringReset() {
final SparkConf sparkConf = new SparkConf();
sparkConf.set("spark.memory.offHeap.enabled", "false");
@@ -152,12 +152,7 @@ public class UnsafeInMemorySorterSuite {
final TaskMemoryManager memoryManager = new TaskMemoryManager(
testMemoryManager, 0);
final TestMemoryConsumer consumer = new TestMemoryConsumer(memoryManager);
- final MemoryBlock dataPage = memoryManager.allocatePage(2048, consumer);
- final Object baseObject = dataPage.getBaseObject();
- // Write the records into the data page:
- long position = dataPage.getBaseOffset();
- final HashPartitioner hashPartitioner = new HashPartitioner(4);
// Use integer comparison for comparing prefixes (which are partition ids, in this case)
final PrefixComparator prefixComparator = PrefixComparators.LONG;
final RecordComparator recordComparator = new RecordComparator() {
@@ -175,18 +170,24 @@ public class UnsafeInMemorySorterSuite {
UnsafeInMemorySorter sorter = new UnsafeInMemorySorter(consumer, memoryManager,
recordComparator, prefixComparator, 100, shouldUseRadixSort());
- testMemoryManager.markExecutionAsOutOfMemoryOnce();
- try {
- sorter.reset();
- fail("expected OutOfMmoryError but it seems operation surprisingly succeeded");
- } catch (OutOfMemoryError oom) {
- // as expected
- }
- // [SPARK-21907] this failed on NPE at
- // org.apache.spark.memory.MemoryConsumer.freeArray(MemoryConsumer.java:108)
- sorter.free();
- // simulate a 'back to back' free.
- sorter.free();
+ // Ensure that the sorter does not OOM while freeing its memory.
+ testMemoryManager.markconsequentOOM(Integer.MAX_VALUE);
+ sorter.freeMemory();
+ testMemoryManager.resetConsequentOOM();
+ Assert.assertFalse(sorter.hasSpaceForAnotherRecord());
+
+ // Get the sorter in an usable state again by allocating a new pointer array.
+ LongArray array = consumer.allocateArray(1000);
+ sorter.expandPointerArray(array);
+
+ // Ensure that it is safe to call freeMemory() multiple times.
+ testMemoryManager.markconsequentOOM(Integer.MAX_VALUE);
+ sorter.freeMemory();
+ sorter.freeMemory();
+ testMemoryManager.resetConsequentOOM();
+ Assert.assertFalse(sorter.hasSpaceForAnotherRecord());
+
+ assertEquals(0L, memoryManager.cleanUpAllAllocatedMemory());
}
}
diff --git a/core/src/test/scala/org/apache/spark/memory/TestMemoryManager.scala b/core/src/test/scala/org/apache/spark/memory/TestMemoryManager.scala
index c26945f..7b8e271 100644
--- a/core/src/test/scala/org/apache/spark/memory/TestMemoryManager.scala
+++ b/core/src/test/scala/org/apache/spark/memory/TestMemoryManager.scala
@@ -69,8 +69,16 @@ class TestMemoryManager(conf: SparkConf)
consequentOOM += n
}
- def limit(avail: Long): Unit = {
- available = avail
+ /**
+ * Undos the effects of [[markExecutionAsOutOfMemoryOnce]] and [[markconsequentOOM]] and lets
+ * calls to [[acquireExecutionMemory()]] (if there is enough memory available).
+ */
+ def resetConsequentOOM(): Unit = synchronized {
+ consequentOOM = 0
}
+ def limit(avail: Long): Unit = synchronized {
+ require(avail >= 0)
+ available = avail
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org