You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by da...@apache.org on 2016/08/19 18:27:38 UTC

spark git commit: [SPARK-17113] [SHUFFLE] Job failure due to Executor OOM in offheap mode

Repository: spark
Updated Branches:
  refs/heads/master 071eaaf9d -> cf0cce903


[SPARK-17113] [SHUFFLE] Job failure due to Executor OOM in offheap mode

## What changes were proposed in this pull request?

This PR fixes executor OOM in offheap mode due to bug in Cooperative Memory Management for UnsafeExternSorter.  UnsafeExternalSorter was checking if memory page is being used by upstream by comparing the base object address of the current page with the base object address of upstream. However, in case of offheap memory allocation, the base object addresses are always null, so there was no spilling happening and eventually the operator would OOM.

Following is the stack trace this issue addresses -
java.lang.OutOfMemoryError: Unable to acquire 1220 bytes of memory, got 0
	at org.apache.spark.memory.MemoryConsumer.allocatePage(MemoryConsumer.java:120)
	at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.acquireNewPageIfNecessary(UnsafeExternalSorter.java:341)
	at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.insertRecord(UnsafeExternalSorter.java:362)
	at org.apache.spark.sql.execution.UnsafeExternalRowSorter.insertRow(UnsafeExternalRowSorter.java:93)
	at org.apache.spark.sql.execution.UnsafeExternalRowSorter.sort(UnsafeExternalRowSorter.java:170)

## How was this patch tested?

Tested by running the failing job.

Author: Sital Kedia <sk...@fb.com>

Closes #14693 from sitalkedia/fix_offheap_oom.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/cf0cce90
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/cf0cce90
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/cf0cce90

Branch: refs/heads/master
Commit: cf0cce90364d17afe780ff9a5426dfcefa298535
Parents: 071eaaf
Author: Sital Kedia <sk...@fb.com>
Authored: Fri Aug 19 11:27:30 2016 -0700
Committer: Davies Liu <da...@gmail.com>
Committed: Fri Aug 19 11:27:30 2016 -0700

----------------------------------------------------------------------
 .../util/collection/unsafe/sort/UnsafeExternalSorter.java     | 2 +-
 .../util/collection/unsafe/sort/UnsafeInMemorySorter.java     | 7 +++++++
 2 files changed, 8 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/cf0cce90/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
index 8d596f8..ccf7664 100644
--- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
+++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
@@ -522,7 +522,7 @@ public final class UnsafeExternalSorter extends MemoryConsumer {
           // is accessing the current record. We free this page in that caller's next loadNext()
           // call.
           for (MemoryBlock page : allocatedPages) {
-            if (!loaded || page.getBaseObject() != upstream.getBaseObject()) {
+            if (!loaded || page.pageNumber != ((UnsafeInMemorySorter.SortedIterator)upstream).getCurrentPageNumber()) {
               released += page.size();
               freePage(page);
             } else {

http://git-wip-us.apache.org/repos/asf/spark/blob/cf0cce90/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java
index 78da389..30d0f30 100644
--- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java
+++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java
@@ -248,6 +248,7 @@ public final class UnsafeInMemorySorter {
     private long baseOffset;
     private long keyPrefix;
     private int recordLength;
+    private long currentPageNumber;
 
     private SortedIterator(int numRecords, int offset) {
       this.numRecords = numRecords;
@@ -262,6 +263,7 @@ public final class UnsafeInMemorySorter {
       iter.baseOffset = baseOffset;
       iter.keyPrefix = keyPrefix;
       iter.recordLength = recordLength;
+      iter.currentPageNumber = currentPageNumber;
       return iter;
     }
 
@@ -279,6 +281,7 @@ public final class UnsafeInMemorySorter {
     public void loadNext() {
       // This pointer points to a 4-byte record length, followed by the record's bytes
       final long recordPointer = array.get(offset + position);
+      currentPageNumber = memoryManager.decodePageNumber(recordPointer);
       baseObject = memoryManager.getPage(recordPointer);
       baseOffset = memoryManager.getOffsetInPage(recordPointer) + 4;  // Skip over record length
       recordLength = Platform.getInt(baseObject, baseOffset - 4);
@@ -292,6 +295,10 @@ public final class UnsafeInMemorySorter {
     @Override
     public long getBaseOffset() { return baseOffset; }
 
+    public long getCurrentPageNumber() {
+      return currentPageNumber;
+    }
+
     @Override
     public int getRecordLength() { return recordLength; }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org