You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by hv...@apache.org on 2020/09/17 10:44:08 UTC

[spark] branch branch-2.4 updated: [SPARK-32900][CORE] Allow UnsafeExternalSorter to spill when there are nulls

This is an automated email from the ASF dual-hosted git repository.

hvanhovell pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-2.4 by this push:
     new 2fa68a6  [SPARK-32900][CORE] Allow UnsafeExternalSorter to spill when there are nulls
2fa68a6 is described below

commit 2fa68a669cc83521c7257d844202790933ae9771
Author: Tom van Bussel <to...@databricks.com>
AuthorDate: Thu Sep 17 12:35:40 2020 +0200

    [SPARK-32900][CORE] Allow UnsafeExternalSorter to spill when there are nulls
    
    ### What changes were proposed in this pull request?
    
    This PR changes the way `UnsafeExternalSorter.SpillableIterator` checks whether it has spilled already, by checking whether `inMemSorter` is null. It also allows it to spill other `UnsafeSorterIterator`s than `UnsafeInMemorySorter.SortedIterator`.
    
    ### Why are the changes needed?
    
    Before this PR `UnsafeExternalSorter.SpillableIterator` could not spill when there are NULLs in the input and radix sorting is used. Currently, Spark determines whether UnsafeExternalSorter.SpillableIterator has not spilled yet by checking whether `upstream` is an instance of `UnsafeInMemorySorter.SortedIterator`. When radix sorting is used and there are NULLs in the input however, `upstream` will be an instance of `UnsafeExternalSorter.ChainedIterator` instead, and Spark will assume  [...]
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    A test was added to `UnsafeExternalSorterSuite` (and therefore also to `UnsafeExternalSorterRadixSortSuite`). I manually confirmed that the test failed in `UnsafeExternalSorterRadixSortSuite` without this patch.
    
    Closes #29772 from tomvanbussel/SPARK-32900.
    
    Authored-by: Tom van Bussel <to...@databricks.com>
    Signed-off-by: herman <he...@databricks.com>
    (cherry picked from commit e5e54a3614ffd2a9150921e84e5b813d5cbf285a)
    Signed-off-by: herman <he...@databricks.com>
---
 .../unsafe/sort/UnsafeExternalSorter.java          | 69 +++++++++++++---------
 .../unsafe/sort/UnsafeInMemorySorter.java          |  1 +
 .../unsafe/sort/UnsafeSorterIterator.java          |  2 +
 .../unsafe/sort/UnsafeSorterSpillMerger.java       |  5 ++
 .../unsafe/sort/UnsafeSorterSpillReader.java       |  5 ++
 .../unsafe/sort/UnsafeExternalSorterSuite.java     | 33 +++++++++++
 6 files changed, 88 insertions(+), 27 deletions(-)

diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
index a6a2076..f720ccd 100644
--- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
+++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
@@ -505,11 +505,15 @@ public final class UnsafeExternalSorter extends MemoryConsumer {
    */
   class SpillableIterator extends UnsafeSorterIterator {
     private UnsafeSorterIterator upstream;
-    private UnsafeSorterIterator nextUpstream = null;
     private MemoryBlock lastPage = null;
     private boolean loaded = false;
     private int numRecords = 0;
 
+    private Object currentBaseObject;
+    private long currentBaseOffset;
+    private int currentRecordLength;
+    private long currentKeyPrefix;
+
     SpillableIterator(UnsafeSorterIterator inMemIterator) {
       this.upstream = inMemIterator;
       this.numRecords = inMemIterator.getNumRecords();
@@ -520,23 +524,26 @@ public final class UnsafeExternalSorter extends MemoryConsumer {
       return numRecords;
     }
 
+    @Override
+    public long getCurrentPageNumber() {
+      throw new UnsupportedOperationException();
+    }
+
     public long spill() throws IOException {
       synchronized (this) {
-        if (!(upstream instanceof UnsafeInMemorySorter.SortedIterator && nextUpstream == null
-          && numRecords > 0)) {
+        if (inMemSorter == null || numRecords <= 0) {
           return 0L;
         }
 
-        UnsafeInMemorySorter.SortedIterator inMemIterator =
-          ((UnsafeInMemorySorter.SortedIterator) upstream).clone();
+        long currentPageNumber = upstream.getCurrentPageNumber();
 
-       ShuffleWriteMetrics writeMetrics = new ShuffleWriteMetrics();
+        ShuffleWriteMetrics writeMetrics = new ShuffleWriteMetrics();
         // Iterate over the records that have not been returned and spill them.
         final UnsafeSorterSpillWriter spillWriter =
           new UnsafeSorterSpillWriter(blockManager, fileBufferSizeBytes, writeMetrics, numRecords);
-        spillIterator(inMemIterator, spillWriter);
+        spillIterator(upstream, spillWriter);
         spillWriters.add(spillWriter);
-        nextUpstream = spillWriter.getReader(serializerManager);
+        upstream = spillWriter.getReader(serializerManager);
 
         long released = 0L;
         synchronized (UnsafeExternalSorter.this) {
@@ -544,8 +551,7 @@ public final class UnsafeExternalSorter extends MemoryConsumer {
           // is accessing the current record. We free this page in that caller's next loadNext()
           // call.
           for (MemoryBlock page : allocatedPages) {
-            if (!loaded || page.pageNumber !=
-                    ((UnsafeInMemorySorter.SortedIterator)upstream).getCurrentPageNumber()) {
+            if (!loaded || page.pageNumber != currentPageNumber) {
               released += page.size();
               freePage(page);
             } else {
@@ -579,22 +585,26 @@ public final class UnsafeExternalSorter extends MemoryConsumer {
       try {
         synchronized (this) {
           loaded = true;
-          if (nextUpstream != null) {
-            // Just consumed the last record from in memory iterator
-            if(lastPage != null) {
-              // Do not free the page here, while we are locking `SpillableIterator`. The `freePage`
-              // method locks the `TaskMemoryManager`, and it's a bad idea to lock 2 objects in
-              // sequence. We may hit dead lock if another thread locks `TaskMemoryManager` and
-              // `SpillableIterator` in sequence, which may happen in
-              // `TaskMemoryManager.acquireExecutionMemory`.
-              pageToFree = lastPage;
-              lastPage = null;
-            }
-            upstream = nextUpstream;
-            nextUpstream = null;
+          // Just consumed the last record from in memory iterator
+          if (lastPage != null) {
+            // Do not free the page here, while we are locking `SpillableIterator`. The `freePage`
+            // method locks the `TaskMemoryManager`, and it's a bad idea to lock 2 objects in
+            // sequence. We may hit dead lock if another thread locks `TaskMemoryManager` and
+            // `SpillableIterator` in sequence, which may happen in
+            // `TaskMemoryManager.acquireExecutionMemory`.
+            pageToFree = lastPage;
+            lastPage = null;
           }
           numRecords--;
           upstream.loadNext();
+
+          // Keep track of the current base object, base offset, record length, and key prefix,
+          // so that the current record can still be read in case a spill is triggered and we
+          // switch to the spill writer's iterator.
+          currentBaseObject = upstream.getBaseObject();
+          currentBaseOffset = upstream.getBaseOffset();
+          currentRecordLength = upstream.getRecordLength();
+          currentKeyPrefix = upstream.getKeyPrefix();
         }
       } finally {
         if (pageToFree != null) {
@@ -605,22 +615,22 @@ public final class UnsafeExternalSorter extends MemoryConsumer {
 
     @Override
     public Object getBaseObject() {
-      return upstream.getBaseObject();
+      return currentBaseObject;
     }
 
     @Override
     public long getBaseOffset() {
-      return upstream.getBaseOffset();
+      return currentBaseOffset;
     }
 
     @Override
     public int getRecordLength() {
-      return upstream.getRecordLength();
+      return currentRecordLength;
     }
 
     @Override
     public long getKeyPrefix() {
-      return upstream.getKeyPrefix();
+      return currentKeyPrefix;
     }
   }
 
@@ -698,6 +708,11 @@ public final class UnsafeExternalSorter extends MemoryConsumer {
     }
 
     @Override
+    public long getCurrentPageNumber() {
+      return current.getCurrentPageNumber();
+    }
+
+    @Override
     public boolean hasNext() {
       while (!current.hasNext() && !iterators.isEmpty()) {
         current = iterators.remove();
diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java
index aedc7ec..9aaa370 100644
--- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java
+++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java
@@ -328,6 +328,7 @@ public final class UnsafeInMemorySorter {
     @Override
     public long getBaseOffset() { return baseOffset; }
 
+    @Override
     public long getCurrentPageNumber() {
       return currentPageNumber;
     }
diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterIterator.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterIterator.java
index 1b3167f..d9f2231 100644
--- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterIterator.java
+++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterIterator.java
@@ -34,4 +34,6 @@ public abstract class UnsafeSorterIterator {
   public abstract long getKeyPrefix();
 
   public abstract int getNumRecords();
+
+  public abstract long getCurrentPageNumber();
 }
diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillMerger.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillMerger.java
index ab80028..f8603c5 100644
--- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillMerger.java
+++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillMerger.java
@@ -71,6 +71,11 @@ final class UnsafeSorterSpillMerger {
       }
 
       @Override
+      public long getCurrentPageNumber() {
+        throw new UnsupportedOperationException();
+      }
+
+      @Override
       public boolean hasNext() {
         return !priorityQueue.isEmpty() || (spillReader != null && spillReader.hasNext());
       }
diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java
index bfca670..84907bd 100644
--- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java
+++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java
@@ -97,6 +97,11 @@ public final class UnsafeSorterSpillReader extends UnsafeSorterIterator implemen
   }
 
   @Override
+  public long getCurrentPageNumber() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
   public boolean hasNext() {
     return (numRecordsRemaining > 0);
   }
diff --git a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java
index 411cd5c..41813dd 100644
--- a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java
+++ b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java
@@ -355,6 +355,39 @@ public class UnsafeExternalSorterSuite {
   }
 
   @Test
+  public void forcedSpillingNullsWithReadIterator() throws Exception {
+    final UnsafeExternalSorter sorter = newSorter();
+    long[] record = new long[100];
+    final int recordSize = record.length * 8;
+    final int n = (int) pageSizeBytes / recordSize * 3;
+    for (int i = 0; i < n; i++) {
+      boolean isNull = i % 2 == 0;
+      sorter.insertRecord(record, Platform.LONG_ARRAY_OFFSET, recordSize, 0, isNull);
+    }
+    assertTrue(sorter.getNumberOfAllocatedPages() >= 2);
+
+    UnsafeExternalSorter.SpillableIterator iter =
+            (UnsafeExternalSorter.SpillableIterator) sorter.getSortedIterator();
+    final int numRecordsToReadBeforeSpilling = n / 3;
+    for (int i = 0; i < numRecordsToReadBeforeSpilling; i++) {
+      assertTrue(iter.hasNext());
+      iter.loadNext();
+    }
+
+    assertTrue(iter.spill() > 0);
+    assertEquals(0, iter.spill());
+
+    for (int i = numRecordsToReadBeforeSpilling; i < n; i++) {
+      assertTrue(iter.hasNext());
+      iter.loadNext();
+    }
+    assertFalse(iter.hasNext());
+
+    sorter.cleanupResources();
+    assertSpillFilesWereCleanedUp();
+  }
+
+  @Test
   public void forcedSpillingWithNotReadIterator() throws Exception {
     final UnsafeExternalSorter sorter = newSorter();
     long[] record = new long[100];


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org