You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by hv...@apache.org on 2020/10/08 08:01:43 UTC

[spark] branch branch-2.4 updated: [SPARK-32901][CORE][2.4] Do not allocate memory while spilling UnsafeExternalSorter

This is an automated email from the ASF dual-hosted git repository.

hvanhovell pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-2.4 by this push:
     new 45a8b89  [SPARK-32901][CORE][2.4] Do not allocate memory while spilling UnsafeExternalSorter
45a8b89 is described below

commit 45a8b892455daaad34d97e37356dee85256d316d
Author: Tom van Bussel <to...@databricks.com>
AuthorDate: Thu Oct 8 09:58:01 2020 +0200

    [SPARK-32901][CORE][2.4] Do not allocate memory while spilling UnsafeExternalSorter
    
    Backport of #29785 to Spark 2.4
    
    ### What changes were proposed in this pull request?
    
    This PR changes `UnsafeExternalSorter` to no longer allocate any memory while spilling. In particular it removes the allocation of a new pointer array in `UnsafeInMemorySorter`. Instead the new pointer array is allocated whenever the next record is inserted into the sorter.
    
    ### Why are the changes needed?
    
    Without this change the `UnsafeExternalSorter` could throw an OOM while spilling. The following sequence of events would have triggered an OOM:
    
    1. `UnsafeExternalSorter` runs out of space in its pointer array and attempts to allocate a new large array to replace the old one.
    2. `TaskMemoryManager` tries to allocate the memory backing the new large array using `MemoryManager`, but `MemoryManager` is only willing to return most but not all of the memory requested.
    3. `TaskMemoryManager` asks `UnsafeExternalSorter` to spill, which causes `UnsafeExternalSorter` to spill the current run to disk, to free its record pages and to reset its `UnsafeInMemorySorter`.
    4. `UnsafeInMemorySorter` frees the old pointer array, and tries to allocate a new small pointer array.
    5. `TaskMemoryManager` tries to allocate the memory backing the small array using `MemoryManager`, but `MemoryManager` is unwilling to give it any memory, as the `TaskMemoryManager` is still holding on to the memory it got for the new large array.
    6. `TaskMemoryManager` again asks `UnsafeExternalSorter` to spill, but this time there is nothing to spill.
    7. `UnsafeInMemorySorter` receives less memory than it requested, and causes a `SparkOutOfMemoryError` to be thrown, which causes the current task to fail.
    
    With the changes in the PR the following will happen instead:
    
    1. `UnsafeExternalSorter` runs out of space in its pointer array and attempts to allocate a new large array to replace the old one.
    2. `TaskMemoryManager` tries to allocate the memory backing the new large array using `MemoryManager`, but `MemoryManager` is only willing to return most but not all of the memory requested.
    3. `TaskMemoryManager` asks `UnsafeExternalSorter` to spill, which causes `UnsafeExternalSorter` to spill the current run to disk, to free its record pages and to reset its `UnsafeInMemorySorter`.
    4. `UnsafeInMemorySorter` frees the old pointer array.
    5. `TaskMemoryManager` returns control to `UnsafeExternalSorter.growPointerArrayIfNecessary` (either by returning the the new large array or by throwing a `SparkOutOfMemoryError`).
    6. `UnsafeExternalSorter` either frees the new large array or it ignores the `SparkOutOfMemoryError` depending on what happened in the previous step.
    7. `UnsafeExternalSorter` successfully allocates a new small pointer array and operation continues as normal.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    Tests were added in `UnsafeExternalSorterSuite` and `UnsafeInMemorySorterSuite`.
    
    Closes #29910 from tomvanbussel/backport-SPARK-32901.
    
    Authored-by: Tom van Bussel <to...@databricks.com>
    Signed-off-by: herman <he...@databricks.com>
---
 .../unsafe/sort/UnsafeExternalSorter.java          | 96 ++++++++++++++++------
 .../unsafe/sort/UnsafeInMemorySorter.java          | 51 ++++++------
 .../unsafe/sort/UnsafeExternalSorterSuite.java     | 45 ++++------
 .../unsafe/sort/UnsafeInMemorySorterSuite.java     | 39 ++++-----
 .../apache/spark/memory/TestMemoryManager.scala    | 12 ++-
 5 files changed, 143 insertions(+), 100 deletions(-)

diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
index f720ccd..9552e79 100644
--- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
+++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
@@ -203,6 +203,10 @@ public final class UnsafeExternalSorter extends MemoryConsumer {
     }
 
     if (inMemSorter == null || inMemSorter.numRecords() <= 0) {
+      // There could still be some memory allocated when there are no records in the in-memory
+      // sorter. We will not spill it however, to ensure that we can always process at least one
+      // record before spilling. See the comments in `allocateMemoryForRecordIfNecessary` for why
+      // this is necessary.
       return 0L;
     }
 
@@ -226,7 +230,7 @@ public final class UnsafeExternalSorter extends MemoryConsumer {
     // Note that this is more-or-less going to be a multiple of the page size, so wasted space in
     // pages will currently be counted as memory spilled even though that space isn't actually
     // written to disk. This also counts the space needed to store the sorter's pointer array.
-    inMemSorter.reset();
+    inMemSorter.freeMemory();
     // Reset the in-memory sorter's pointer array only after freeing up the memory pages holding the
     // records. Otherwise, if the task is over allocated memory, then without freeing the memory
     // pages, we might not be able to get memory for the pointer array.
@@ -327,7 +331,7 @@ public final class UnsafeExternalSorter extends MemoryConsumer {
       deleteSpillFiles();
       freeMemory();
       if (inMemSorter != null) {
-        inMemSorter.free();
+        inMemSorter.freeMemory();
         inMemSorter = null;
       }
     }
@@ -341,40 +345,53 @@ public final class UnsafeExternalSorter extends MemoryConsumer {
   private void growPointerArrayIfNecessary() throws IOException {
     assert(inMemSorter != null);
     if (!inMemSorter.hasSpaceForAnotherRecord()) {
+      if (inMemSorter.numRecords() <= 0) {
+        // Spilling was triggered just before this method was called. The pointer array was freed
+        // during the spill, so a new pointer array needs to be allocated here.
+        LongArray array = allocateArray(inMemSorter.getInitialSize());
+        inMemSorter.expandPointerArray(array);
+        return;
+      }
+
       long used = inMemSorter.getMemoryUsage();
-      LongArray array;
+      LongArray array = null;
       try {
         // could trigger spilling
         array = allocateArray(used / 8 * 2);
       } catch (TooLargePageException e) {
         // The pointer array is too big to fix in a single page, spill.
         spill();
-        return;
       } catch (SparkOutOfMemoryError e) {
-        // should have trigger spilling
-        if (!inMemSorter.hasSpaceForAnotherRecord()) {
+        if (inMemSorter.numRecords() > 0) {
           logger.error("Unable to grow the pointer array");
           throw e;
         }
-        return;
+        // The new array could not be allocated, but that is not an issue as it is longer needed,
+        // as all records were spilled.
       }
-      // check if spilling is triggered or not
-      if (inMemSorter.hasSpaceForAnotherRecord()) {
-        freeArray(array);
-      } else {
-        inMemSorter.expandPointerArray(array);
+
+      if (inMemSorter.numRecords() <= 0) {
+        // Spilling was triggered while trying to allocate the new array.
+        if (array != null) {
+          // We succeeded in allocating the new array, but, since all records were spilled, a
+          // smaller array would also suffice.
+          freeArray(array);
+        }
+        // The pointer array was freed during the spill, so a new pointer array needs to be
+        // allocated here.
+        array = allocateArray(inMemSorter.getInitialSize());
       }
+      inMemSorter.expandPointerArray(array);
     }
   }
 
   /**
-   * Allocates more memory in order to insert an additional record. This will request additional
-   * memory from the memory manager and spill if the requested memory can not be obtained.
+   * Allocates an additional page in order to insert an additional record. This will request
+   * additional memory from the memory manager and spill if the requested memory can not be
+   * obtained.
    *
    * @param required the required space in the data page, in bytes, including space for storing
-   *                      the record size. This must be less than or equal to the page size (records
-   *                      that exceed the page size are handled via a different code path which uses
-   *                      special overflow pages).
+   *                 the record size.
    */
   private void acquireNewPageIfNecessary(int required) {
     if (currentPage == null ||
@@ -387,6 +404,37 @@ public final class UnsafeExternalSorter extends MemoryConsumer {
   }
 
   /**
+   * Allocates more memory in order to insert an additional record. This will request additional
+   * memory from the memory manager and spill if the requested memory can not be obtained.
+   *
+   * @param required the required space in the data page, in bytes, including space for storing
+   *                 the record size.
+   */
+  private void allocateMemoryForRecordIfNecessary(int required) throws IOException {
+    // Step 1:
+    // Ensure that the pointer array has space for another record. This may cause a spill.
+    growPointerArrayIfNecessary();
+    // Step 2:
+    // Ensure that the last page has space for another record. This may cause a spill.
+    acquireNewPageIfNecessary(required);
+    // Step 3:
+    // The allocation in step 2 could have caused a spill, which would have freed the pointer
+    // array allocated in step 1. Therefore we need to check again whether we have to allocate
+    // a new pointer array.
+    //
+    // If the allocation in this step causes a spill event then it will not cause the page
+    // allocated in the previous step to be freed. The function `spill` only frees memory if at
+    // least one record has been inserted in the in-memory sorter. This will not be the case if
+    // we have spilled in the previous step.
+    //
+    // If we did not spill in the previous step then `growPointerArrayIfNecessary` will be a
+    // no-op that does not allocate any memory, and therefore can't cause a spill event.
+    //
+    // Thus there is no need to call `acquireNewPageIfNecessary` again after this step.
+    growPointerArrayIfNecessary();
+  }
+
+  /**
    * Write a record to the sorter.
    */
   public void insertRecord(
@@ -400,11 +448,10 @@ public final class UnsafeExternalSorter extends MemoryConsumer {
       spill();
     }
 
-    growPointerArrayIfNecessary();
-    int uaoSize = UnsafeAlignedOffset.getUaoSize();
+    final int uaoSize = UnsafeAlignedOffset.getUaoSize();
     // Need 4 or 8 bytes to store the record length.
     final int required = length + uaoSize;
-    acquireNewPageIfNecessary(required);
+    allocateMemoryForRecordIfNecessary(required);
 
     final Object base = currentPage.getBaseObject();
     final long recordAddress = taskMemoryManager.encodePageNumberAndOffset(currentPage, pageCursor);
@@ -427,10 +474,9 @@ public final class UnsafeExternalSorter extends MemoryConsumer {
       Object valueBase, long valueOffset, int valueLen, long prefix, boolean prefixIsNull)
     throws IOException {
 
-    growPointerArrayIfNecessary();
-    int uaoSize = UnsafeAlignedOffset.getUaoSize();
+    final int uaoSize = UnsafeAlignedOffset.getUaoSize();
     final int required = keyLen + valueLen + (2 * uaoSize);
-    acquireNewPageIfNecessary(required);
+    allocateMemoryForRecordIfNecessary(required);
 
     final Object base = currentPage.getBaseObject();
     final long recordAddress = taskMemoryManager.encodePageNumberAndOffset(currentPage, pageCursor);
@@ -565,7 +611,7 @@ public final class UnsafeExternalSorter extends MemoryConsumer {
         assert(inMemSorter != null);
         released += inMemSorter.getMemoryUsage();
         totalSortTimeNanos += inMemSorter.getSortTimeNanos();
-        inMemSorter.free();
+        inMemSorter.freeMemory();
         inMemSorter = null;
         taskContext.taskMetrics().incMemoryBytesSpilled(released);
         taskContext.taskMetrics().incDiskBytesSpilled(writeMetrics.bytesWritten());
@@ -660,7 +706,7 @@ public final class UnsafeExternalSorter extends MemoryConsumer {
         }
         i += spillWriter.recordsSpilled();
       }
-      if (inMemSorter != null) {
+      if (inMemSorter != null && inMemSorter.numRecords() > 0) {
         UnsafeSorterIterator iter = inMemSorter.getSortedIterator();
         moveOver(iter, startIndex - i);
         queue.add(iter);
diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java
index 9aaa370..5885bb5 100644
--- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java
+++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java
@@ -159,32 +159,26 @@ public final class UnsafeInMemorySorter {
     return (int) (array.size() / (radixSortSupport != null ? 2 : 1.5));
   }
 
+  public long getInitialSize() {
+    return initialSize;
+  }
+
   /**
    * Free the memory used by pointer array.
    */
-  public void free() {
+  public void freeMemory() {
     if (consumer != null) {
       if (array != null) {
         consumer.freeArray(array);
       }
-      array = null;
-    }
-  }
 
-  public void reset() {
-    if (consumer != null) {
-      consumer.freeArray(array);
-      // the call to consumer.allocateArray may trigger a spill which in turn access this instance
-      // and eventually re-enter this method and try to free the array again.  by setting the array
-      // to null and its length to 0 we effectively make the spill code-path a no-op.  setting the
-      // array to null also indicates that it has already been de-allocated which prevents a double
-      // de-allocation in free().
+      // Set the array to null instead of allocating a new array. Allocating an array could have
+      // triggered another spill and this method already is called from UnsafeExternalSorter when
+      // spilling. Attempting to allocate while spilling is dangerous, as we could be holding onto
+      // a large partially complete allocation, which may prevent other memory from being allocated.
+      // Instead we will allocate the new array when it is necessary.
       array = null;
       usableCapacity = 0;
-      pos = 0;
-      nullBoundaryPos = 0;
-      array = consumer.allocateArray(initialSize);
-      usableCapacity = getUsableCapacity();
     }
     pos = 0;
     nullBoundaryPos = 0;
@@ -217,16 +211,18 @@ public final class UnsafeInMemorySorter {
   }
 
   public void expandPointerArray(LongArray newArray) {
-    if (newArray.size() < array.size()) {
-      throw new SparkOutOfMemoryError("Not enough memory to grow pointer array");
+    if (array != null) {
+      if (newArray.size() < array.size()) {
+        throw new SparkOutOfMemoryError("Not enough memory to grow pointer array");
+      }
+      Platform.copyMemory(
+        array.getBaseObject(),
+        array.getBaseOffset(),
+        newArray.getBaseObject(),
+        newArray.getBaseOffset(),
+        pos * 8L);
+      consumer.freeArray(array);
     }
-    Platform.copyMemory(
-      array.getBaseObject(),
-      array.getBaseOffset(),
-      newArray.getBaseObject(),
-      newArray.getBaseOffset(),
-      pos * 8L);
-    consumer.freeArray(array);
     array = newArray;
     usableCapacity = getUsableCapacity();
   }
@@ -345,6 +341,11 @@ public final class UnsafeInMemorySorter {
    * {@code next()} will return the same mutable object.
    */
   public UnsafeSorterIterator getSortedIterator() {
+    if (numRecords() == 0) {
+      // `array` might be null, so make sure that it is not accessed by returning early.
+      return new SortedIterator(0, 0);
+    }
+
     int offset = 0;
     long start = System.nanoTime();
     if (sortComparator != null) {
diff --git a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java
index 41813dd..eefd0c9 100644
--- a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java
+++ b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java
@@ -23,7 +23,6 @@ import java.util.Arrays;
 import java.util.LinkedList;
 import java.util.UUID;
 
-import org.hamcrest.Matchers;
 import scala.Tuple2$;
 
 import org.junit.After;
@@ -544,40 +543,28 @@ public class UnsafeExternalSorterSuite {
   }
 
   @Test
-  public void testOOMDuringSpill() throws Exception {
+  public void testNoOOMDuringSpill() throws Exception {
     final UnsafeExternalSorter sorter = newSorter();
-    // we assume that given default configuration,
-    // the size of the data we insert to the sorter (ints)
-    // and assuming we shouldn't spill before pointers array is exhausted
-    // (memory manager is not configured to throw at this point)
-    // - so this loop runs a reasonable number of iterations (<2000).
-    // test indeed completed within <30ms (on a quad i7 laptop).
-    for (int i = 0; sorter.hasSpaceForAnotherRecord(); ++i) {
+    for (int i = 0; i < 100; i++) {
       insertNumber(sorter, i);
     }
-    // we expect the next insert to attempt growing the pointerssArray first
-    // allocation is expected to fail, then a spill is triggered which
-    // attempts another allocation which also fails and we expect to see this
-    // OOM here.  the original code messed with a released array within the
-    // spill code and ended up with a failed assertion.  we also expect the
-    // location of the OOM to be
-    // org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter.reset
-    memoryManager.markconsequentOOM(2);
-    try {
-      insertNumber(sorter, 1024);
-      fail("expected OutOfMmoryError but it seems operation surprisingly succeeded");
-    }
-    // we expect an OutOfMemoryError here, anything else (i.e the original NPE is a failure)
-    catch (OutOfMemoryError oom){
-      String oomStackTrace = Utils.exceptionString(oom);
-      assertThat("expected OutOfMemoryError in " +
-        "org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter.reset",
-        oomStackTrace,
-        Matchers.containsString(
-          "org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter.reset"));
+
+    // Check that spilling still succeeds when the task is starved for memory.
+    memoryManager.markconsequentOOM(Integer.MAX_VALUE);
+    sorter.spill();
+    memoryManager.resetConsequentOOM();
+
+    // Ensure that records can be appended after spilling, i.e. check that the sorter will allocate
+    // the new pointer array that it could not allocate while spilling.
+    for (int i = 0; i < 100; ++i) {
+      insertNumber(sorter, i);
     }
+
+    sorter.cleanupResources();
+    assertSpillFilesWereCleanedUp();
   }
 
+
   private void verifyIntIterator(UnsafeSorterIterator iter, int start, int end)
       throws IOException {
     for (int i = start; i < end; i++) {
diff --git a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorterSuite.java b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorterSuite.java
index 85ffdca4..9ef83ed 100644
--- a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorterSuite.java
+++ b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorterSuite.java
@@ -20,6 +20,7 @@ package org.apache.spark.util.collection.unsafe.sort;
 import java.nio.charset.StandardCharsets;
 import java.util.Arrays;
 
+import org.apache.spark.unsafe.array.LongArray;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -35,7 +36,6 @@ import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.greaterThanOrEqualTo;
 import static org.hamcrest.Matchers.isIn;
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
 import static org.mockito.Mockito.mock;
 
 public class UnsafeInMemorySorterSuite {
@@ -143,7 +143,7 @@ public class UnsafeInMemorySorterSuite {
   }
 
   @Test
-  public void freeAfterOOM() {
+  public void testNoOOMDuringReset() {
     final SparkConf sparkConf = new SparkConf();
     sparkConf.set("spark.memory.offHeap.enabled", "false");
 
@@ -152,12 +152,7 @@ public class UnsafeInMemorySorterSuite {
     final TaskMemoryManager memoryManager = new TaskMemoryManager(
             testMemoryManager, 0);
     final TestMemoryConsumer consumer = new TestMemoryConsumer(memoryManager);
-    final MemoryBlock dataPage = memoryManager.allocatePage(2048, consumer);
-    final Object baseObject = dataPage.getBaseObject();
-    // Write the records into the data page:
-    long position = dataPage.getBaseOffset();
 
-    final HashPartitioner hashPartitioner = new HashPartitioner(4);
     // Use integer comparison for comparing prefixes (which are partition ids, in this case)
     final PrefixComparator prefixComparator = PrefixComparators.LONG;
     final RecordComparator recordComparator = new RecordComparator() {
@@ -175,18 +170,24 @@ public class UnsafeInMemorySorterSuite {
     UnsafeInMemorySorter sorter = new UnsafeInMemorySorter(consumer, memoryManager,
             recordComparator, prefixComparator, 100, shouldUseRadixSort());
 
-    testMemoryManager.markExecutionAsOutOfMemoryOnce();
-    try {
-      sorter.reset();
-      fail("expected OutOfMmoryError but it seems operation surprisingly succeeded");
-    } catch (OutOfMemoryError oom) {
-      // as expected
-    }
-    // [SPARK-21907] this failed on NPE at
-    // org.apache.spark.memory.MemoryConsumer.freeArray(MemoryConsumer.java:108)
-    sorter.free();
-    // simulate a 'back to back' free.
-    sorter.free();
+    // Ensure that the sorter does not OOM while freeing its memory.
+    testMemoryManager.markconsequentOOM(Integer.MAX_VALUE);
+    sorter.freeMemory();
+    testMemoryManager.resetConsequentOOM();
+    Assert.assertFalse(sorter.hasSpaceForAnotherRecord());
+
+    // Get the sorter in an usable state again by allocating a new pointer array.
+    LongArray array = consumer.allocateArray(1000);
+    sorter.expandPointerArray(array);
+
+    // Ensure that it is safe to call freeMemory() multiple times.
+    testMemoryManager.markconsequentOOM(Integer.MAX_VALUE);
+    sorter.freeMemory();
+    sorter.freeMemory();
+    testMemoryManager.resetConsequentOOM();
+    Assert.assertFalse(sorter.hasSpaceForAnotherRecord());
+
+    assertEquals(0L, memoryManager.cleanUpAllAllocatedMemory());
   }
 
 }
diff --git a/core/src/test/scala/org/apache/spark/memory/TestMemoryManager.scala b/core/src/test/scala/org/apache/spark/memory/TestMemoryManager.scala
index c26945f..7b8e271 100644
--- a/core/src/test/scala/org/apache/spark/memory/TestMemoryManager.scala
+++ b/core/src/test/scala/org/apache/spark/memory/TestMemoryManager.scala
@@ -69,8 +69,16 @@ class TestMemoryManager(conf: SparkConf)
     consequentOOM += n
   }
 
-  def limit(avail: Long): Unit = {
-    available = avail
+  /**
+   * Undos the effects of [[markExecutionAsOutOfMemoryOnce]] and [[markconsequentOOM]] and lets
+   * calls to [[acquireExecutionMemory()]] (if there is enough memory available).
+   */
+  def resetConsequentOOM(): Unit = synchronized {
+    consequentOOM = 0
   }
 
+  def limit(avail: Long): Unit = synchronized {
+    require(avail >= 0)
+    available = avail
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org