You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by jo...@apache.org on 2016/02/08 21:10:42 UTC

spark git commit: [SPARK-13210][SQL] catch OOM when allocate memory and expand array

Repository: spark
Updated Branches:
  refs/heads/master 8e4d15f70 -> 37bc203c8


[SPARK-13210][SQL] catch OOM when allocate memory and expand array

There is a bug when we try to grow the buffer, OOM is ignore wrongly (the assert also skipped by JVM), then we try grow the array again, this one will trigger spilling free the current page, the current record we inserted will be invalid.

The root cause is that JVM has less free memory than MemoryManager thought, it will OOM when allocate a page without trigger spilling. We should catch the OOM, and acquire memory again to trigger spilling.

And also, we could not grow the array in `insertRecord` of `InMemorySorter` (it was there just for easy testing).

Author: Davies Liu <da...@databricks.com>

Closes #11095 from davies/fix_expand.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/37bc203c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/37bc203c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/37bc203c

Branch: refs/heads/master
Commit: 37bc203c8dd5022cb11d53b697c28a737ee85bcc
Parents: 8e4d15f
Author: Davies Liu <da...@databricks.com>
Authored: Mon Feb 8 12:08:58 2016 -0800
Committer: Josh Rosen <jo...@databricks.com>
Committed: Mon Feb 8 12:09:20 2016 -0800

----------------------------------------------------------------------
 .../apache/spark/memory/TaskMemoryManager.java  | 23 +++++++++++++++++++-
 .../shuffle/sort/ShuffleExternalSorter.java     | 10 +--------
 .../shuffle/sort/ShuffleInMemorySorter.java     |  2 +-
 .../unsafe/sort/UnsafeExternalSorter.java       | 10 +--------
 .../unsafe/sort/UnsafeInMemorySorter.java       |  2 +-
 .../sort/ShuffleInMemorySorterSuite.java        |  6 +++++
 .../unsafe/sort/UnsafeInMemorySorterSuite.java  |  3 +++
 7 files changed, 35 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/37bc203c/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java b/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java
index d2a8886..8757dff 100644
--- a/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java
+++ b/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java
@@ -112,6 +112,11 @@ public class TaskMemoryManager {
   private final HashSet<MemoryConsumer> consumers;
 
   /**
+   * The amount of memory that is acquired but not used.
+   */
+  private long acquiredButNotUsed = 0L;
+
+  /**
    * Construct a new TaskMemoryManager.
    */
   public TaskMemoryManager(MemoryManager memoryManager, long taskAttemptId) {
@@ -256,7 +261,20 @@ public class TaskMemoryManager {
       }
       allocatedPages.set(pageNumber);
     }
-    final MemoryBlock page = memoryManager.tungstenMemoryAllocator().allocate(acquired);
+    MemoryBlock page = null;
+    try {
+      page = memoryManager.tungstenMemoryAllocator().allocate(acquired);
+    } catch (OutOfMemoryError e) {
+      logger.warn("Failed to allocate a page ({} bytes), try again.", acquired);
+      // there is no enough memory actually, it means the actual free memory is smaller than
+      // MemoryManager thought, we should keep the acquired memory.
+      acquiredButNotUsed += acquired;
+      synchronized (this) {
+        allocatedPages.clear(pageNumber);
+      }
+      // this could trigger spilling to free some pages.
+      return allocatePage(size, consumer);
+    }
     page.pageNumber = pageNumber;
     pageTable[pageNumber] = page;
     if (logger.isTraceEnabled()) {
@@ -378,6 +396,9 @@ public class TaskMemoryManager {
     }
     Arrays.fill(pageTable, null);
 
+    // release the memory that is not used by any consumer.
+    memoryManager.releaseExecutionMemory(acquiredButNotUsed, taskAttemptId, tungstenMemoryMode);
+
     return memoryManager.releaseAllExecutionMemoryForTask(taskAttemptId);
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/37bc203c/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java b/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java
index 2c84de5..f97e76d 100644
--- a/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java
+++ b/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java
@@ -320,15 +320,7 @@ final class ShuffleExternalSorter extends MemoryConsumer {
     assert(inMemSorter != null);
     if (!inMemSorter.hasSpaceForAnotherRecord()) {
       long used = inMemSorter.getMemoryUsage();
-      LongArray array;
-      try {
-        // could trigger spilling
-        array = allocateArray(used / 8 * 2);
-      } catch (OutOfMemoryError e) {
-        // should have trigger spilling
-        assert(inMemSorter.hasSpaceForAnotherRecord());
-        return;
-      }
+      LongArray array = allocateArray(used / 8 * 2);
       // check if spilling is triggered or not
       if (inMemSorter.hasSpaceForAnotherRecord()) {
         freeArray(array);

http://git-wip-us.apache.org/repos/asf/spark/blob/37bc203c/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorter.java b/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorter.java
index 58ad88e..d74602c 100644
--- a/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorter.java
+++ b/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorter.java
@@ -104,7 +104,7 @@ final class ShuffleInMemorySorter {
    */
   public void insertRecord(long recordPointer, int partitionId) {
     if (!hasSpaceForAnotherRecord()) {
-      expandPointerArray(consumer.allocateArray(array.size() * 2));
+      throw new IllegalStateException("There is no space for new record");
     }
     array.set(pos, PackedRecordPointer.packPointer(recordPointer, partitionId));
     pos++;

http://git-wip-us.apache.org/repos/asf/spark/blob/37bc203c/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
index a6edc1a..296bf72 100644
--- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
+++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
@@ -293,15 +293,7 @@ public final class UnsafeExternalSorter extends MemoryConsumer {
     assert(inMemSorter != null);
     if (!inMemSorter.hasSpaceForAnotherRecord()) {
       long used = inMemSorter.getMemoryUsage();
-      LongArray array;
-      try {
-        // could trigger spilling
-        array = allocateArray(used / 8 * 2);
-      } catch (OutOfMemoryError e) {
-        // should have trigger spilling
-        assert(inMemSorter.hasSpaceForAnotherRecord());
-        return;
-      }
+      LongArray array = allocateArray(used / 8 * 2);
       // check if spilling is triggered or not
       if (inMemSorter.hasSpaceForAnotherRecord()) {
         freeArray(array);

http://git-wip-us.apache.org/repos/asf/spark/blob/37bc203c/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java
index d1b0bc5..cea0f0a 100644
--- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java
+++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java
@@ -164,7 +164,7 @@ public final class UnsafeInMemorySorter {
    */
   public void insertRecord(long recordPointer, long keyPrefix) {
     if (!hasSpaceForAnotherRecord()) {
-      expandPointerArray(consumer.allocateArray(array.size() * 2));
+      throw new IllegalStateException("There is no space for new record");
     }
     array.set(pos, recordPointer);
     pos++;

http://git-wip-us.apache.org/repos/asf/spark/blob/37bc203c/core/src/test/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorterSuite.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorterSuite.java b/core/src/test/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorterSuite.java
index 0328e63..eb1da8e 100644
--- a/core/src/test/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorterSuite.java
+++ b/core/src/test/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorterSuite.java
@@ -75,6 +75,9 @@ public class ShuffleInMemorySorterSuite {
     // Write the records into the data page and store pointers into the sorter
     long position = dataPage.getBaseOffset();
     for (String str : dataToSort) {
+      if (!sorter.hasSpaceForAnotherRecord()) {
+        sorter.expandPointerArray(consumer.allocateArray(sorter.numRecords() * 2));
+      }
       final long recordAddress = memoryManager.encodePageNumberAndOffset(dataPage, position);
       final byte[] strBytes = str.getBytes("utf-8");
       Platform.putInt(baseObject, position, strBytes.length);
@@ -114,6 +117,9 @@ public class ShuffleInMemorySorterSuite {
     int[] numbersToSort = new int[128000];
     Random random = new Random(16);
     for (int i = 0; i < numbersToSort.length; i++) {
+      if (!sorter.hasSpaceForAnotherRecord()) {
+        sorter.expandPointerArray(consumer.allocateArray(sorter.numRecords() * 2));
+      }
       numbersToSort[i] = random.nextInt(PackedRecordPointer.MAXIMUM_PARTITION_ID + 1);
       sorter.insertRecord(0, numbersToSort[i]);
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/37bc203c/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorterSuite.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorterSuite.java b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorterSuite.java
index 93efd03..8e557ec 100644
--- a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorterSuite.java
+++ b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorterSuite.java
@@ -111,6 +111,9 @@ public class UnsafeInMemorySorterSuite {
     // Given a page of records, insert those records into the sorter one-by-one:
     position = dataPage.getBaseOffset();
     for (int i = 0; i < dataToSort.length; i++) {
+      if (!sorter.hasSpaceForAnotherRecord()) {
+        sorter.expandPointerArray(consumer.allocateArray(sorter.numRecords() * 2 * 2));
+      }
       // position now points to the start of a record (which holds its length).
       final int recordLength = Platform.getInt(baseObject, position);
       final long address = memoryManager.encodePageNumberAndOffset(dataPage, position);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org