You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by hv...@apache.org on 2020/09/17 10:44:08 UTC
[spark] branch branch-2.4 updated: [SPARK-32900][CORE] Allow
UnsafeExternalSorter to spill when there are nulls
This is an automated email from the ASF dual-hosted git repository.
hvanhovell pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-2.4 by this push:
new 2fa68a6 [SPARK-32900][CORE] Allow UnsafeExternalSorter to spill when there are nulls
2fa68a6 is described below
commit 2fa68a669cc83521c7257d844202790933ae9771
Author: Tom van Bussel <to...@databricks.com>
AuthorDate: Thu Sep 17 12:35:40 2020 +0200
[SPARK-32900][CORE] Allow UnsafeExternalSorter to spill when there are nulls
### What changes were proposed in this pull request?
This PR changes the way `UnsafeExternalSorter.SpillableIterator` checks whether it has spilled already, by checking whether `inMemSorter` is null. It also allows it to spill other `UnsafeSorterIterator`s than `UnsafeInMemorySorter.SortedIterator`.
### Why are the changes needed?
Before this PR `UnsafeExternalSorter.SpillableIterator` could not spill when there are NULLs in the input and radix sorting is used. Currently, Spark determines whether UnsafeExternalSorter.SpillableIterator has not spilled yet by checking whether `upstream` is an instance of `UnsafeInMemorySorter.SortedIterator`. When radix sorting is used and there are NULLs in the input however, `upstream` will be an instance of `UnsafeExternalSorter.ChainedIterator` instead, and Spark will assume [...]
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
A test was added to `UnsafeExternalSorterSuite` (and therefore also to `UnsafeExternalSorterRadixSortSuite`). I manually confirmed that the test failed in `UnsafeExternalSorterRadixSortSuite` without this patch.
Closes #29772 from tomvanbussel/SPARK-32900.
Authored-by: Tom van Bussel <to...@databricks.com>
Signed-off-by: herman <he...@databricks.com>
(cherry picked from commit e5e54a3614ffd2a9150921e84e5b813d5cbf285a)
Signed-off-by: herman <he...@databricks.com>
---
.../unsafe/sort/UnsafeExternalSorter.java | 69 +++++++++++++---------
.../unsafe/sort/UnsafeInMemorySorter.java | 1 +
.../unsafe/sort/UnsafeSorterIterator.java | 2 +
.../unsafe/sort/UnsafeSorterSpillMerger.java | 5 ++
.../unsafe/sort/UnsafeSorterSpillReader.java | 5 ++
.../unsafe/sort/UnsafeExternalSorterSuite.java | 33 +++++++++++
6 files changed, 88 insertions(+), 27 deletions(-)
diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
index a6a2076..f720ccd 100644
--- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
+++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
@@ -505,11 +505,15 @@ public final class UnsafeExternalSorter extends MemoryConsumer {
*/
class SpillableIterator extends UnsafeSorterIterator {
private UnsafeSorterIterator upstream;
- private UnsafeSorterIterator nextUpstream = null;
private MemoryBlock lastPage = null;
private boolean loaded = false;
private int numRecords = 0;
+ private Object currentBaseObject;
+ private long currentBaseOffset;
+ private int currentRecordLength;
+ private long currentKeyPrefix;
+
SpillableIterator(UnsafeSorterIterator inMemIterator) {
this.upstream = inMemIterator;
this.numRecords = inMemIterator.getNumRecords();
@@ -520,23 +524,26 @@ public final class UnsafeExternalSorter extends MemoryConsumer {
return numRecords;
}
+ @Override
+ public long getCurrentPageNumber() {
+ throw new UnsupportedOperationException();
+ }
+
public long spill() throws IOException {
synchronized (this) {
- if (!(upstream instanceof UnsafeInMemorySorter.SortedIterator && nextUpstream == null
- && numRecords > 0)) {
+ if (inMemSorter == null || numRecords <= 0) {
return 0L;
}
- UnsafeInMemorySorter.SortedIterator inMemIterator =
- ((UnsafeInMemorySorter.SortedIterator) upstream).clone();
+ long currentPageNumber = upstream.getCurrentPageNumber();
- ShuffleWriteMetrics writeMetrics = new ShuffleWriteMetrics();
+ ShuffleWriteMetrics writeMetrics = new ShuffleWriteMetrics();
// Iterate over the records that have not been returned and spill them.
final UnsafeSorterSpillWriter spillWriter =
new UnsafeSorterSpillWriter(blockManager, fileBufferSizeBytes, writeMetrics, numRecords);
- spillIterator(inMemIterator, spillWriter);
+ spillIterator(upstream, spillWriter);
spillWriters.add(spillWriter);
- nextUpstream = spillWriter.getReader(serializerManager);
+ upstream = spillWriter.getReader(serializerManager);
long released = 0L;
synchronized (UnsafeExternalSorter.this) {
@@ -544,8 +551,7 @@ public final class UnsafeExternalSorter extends MemoryConsumer {
// is accessing the current record. We free this page in that caller's next loadNext()
// call.
for (MemoryBlock page : allocatedPages) {
- if (!loaded || page.pageNumber !=
- ((UnsafeInMemorySorter.SortedIterator)upstream).getCurrentPageNumber()) {
+ if (!loaded || page.pageNumber != currentPageNumber) {
released += page.size();
freePage(page);
} else {
@@ -579,22 +585,26 @@ public final class UnsafeExternalSorter extends MemoryConsumer {
try {
synchronized (this) {
loaded = true;
- if (nextUpstream != null) {
- // Just consumed the last record from in memory iterator
- if(lastPage != null) {
- // Do not free the page here, while we are locking `SpillableIterator`. The `freePage`
- // method locks the `TaskMemoryManager`, and it's a bad idea to lock 2 objects in
- // sequence. We may hit dead lock if another thread locks `TaskMemoryManager` and
- // `SpillableIterator` in sequence, which may happen in
- // `TaskMemoryManager.acquireExecutionMemory`.
- pageToFree = lastPage;
- lastPage = null;
- }
- upstream = nextUpstream;
- nextUpstream = null;
+ // Just consumed the last record from in memory iterator
+ if (lastPage != null) {
+ // Do not free the page here, while we are locking `SpillableIterator`. The `freePage`
+ // method locks the `TaskMemoryManager`, and it's a bad idea to lock 2 objects in
+ // sequence. We may hit dead lock if another thread locks `TaskMemoryManager` and
+ // `SpillableIterator` in sequence, which may happen in
+ // `TaskMemoryManager.acquireExecutionMemory`.
+ pageToFree = lastPage;
+ lastPage = null;
}
numRecords--;
upstream.loadNext();
+
+ // Keep track of the current base object, base offset, record length, and key prefix,
+ // so that the current record can still be read in case a spill is triggered and we
+ // switch to the spill writer's iterator.
+ currentBaseObject = upstream.getBaseObject();
+ currentBaseOffset = upstream.getBaseOffset();
+ currentRecordLength = upstream.getRecordLength();
+ currentKeyPrefix = upstream.getKeyPrefix();
}
} finally {
if (pageToFree != null) {
@@ -605,22 +615,22 @@ public final class UnsafeExternalSorter extends MemoryConsumer {
@Override
public Object getBaseObject() {
- return upstream.getBaseObject();
+ return currentBaseObject;
}
@Override
public long getBaseOffset() {
- return upstream.getBaseOffset();
+ return currentBaseOffset;
}
@Override
public int getRecordLength() {
- return upstream.getRecordLength();
+ return currentRecordLength;
}
@Override
public long getKeyPrefix() {
- return upstream.getKeyPrefix();
+ return currentKeyPrefix;
}
}
@@ -698,6 +708,11 @@ public final class UnsafeExternalSorter extends MemoryConsumer {
}
@Override
+ public long getCurrentPageNumber() {
+ return current.getCurrentPageNumber();
+ }
+
+ @Override
public boolean hasNext() {
while (!current.hasNext() && !iterators.isEmpty()) {
current = iterators.remove();
diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java
index aedc7ec..9aaa370 100644
--- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java
+++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java
@@ -328,6 +328,7 @@ public final class UnsafeInMemorySorter {
@Override
public long getBaseOffset() { return baseOffset; }
+ @Override
public long getCurrentPageNumber() {
return currentPageNumber;
}
diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterIterator.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterIterator.java
index 1b3167f..d9f2231 100644
--- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterIterator.java
+++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterIterator.java
@@ -34,4 +34,6 @@ public abstract class UnsafeSorterIterator {
public abstract long getKeyPrefix();
public abstract int getNumRecords();
+
+ public abstract long getCurrentPageNumber();
}
diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillMerger.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillMerger.java
index ab80028..f8603c5 100644
--- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillMerger.java
+++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillMerger.java
@@ -71,6 +71,11 @@ final class UnsafeSorterSpillMerger {
}
@Override
+ public long getCurrentPageNumber() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
public boolean hasNext() {
return !priorityQueue.isEmpty() || (spillReader != null && spillReader.hasNext());
}
diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java
index bfca670..84907bd 100644
--- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java
+++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java
@@ -97,6 +97,11 @@ public final class UnsafeSorterSpillReader extends UnsafeSorterIterator implemen
}
@Override
+ public long getCurrentPageNumber() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
public boolean hasNext() {
return (numRecordsRemaining > 0);
}
diff --git a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java
index 411cd5c..41813dd 100644
--- a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java
+++ b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java
@@ -355,6 +355,39 @@ public class UnsafeExternalSorterSuite {
}
@Test
+ public void forcedSpillingNullsWithReadIterator() throws Exception {
+ final UnsafeExternalSorter sorter = newSorter();
+ long[] record = new long[100];
+ final int recordSize = record.length * 8;
+ final int n = (int) pageSizeBytes / recordSize * 3;
+ for (int i = 0; i < n; i++) {
+ boolean isNull = i % 2 == 0;
+ sorter.insertRecord(record, Platform.LONG_ARRAY_OFFSET, recordSize, 0, isNull);
+ }
+ assertTrue(sorter.getNumberOfAllocatedPages() >= 2);
+
+ UnsafeExternalSorter.SpillableIterator iter =
+ (UnsafeExternalSorter.SpillableIterator) sorter.getSortedIterator();
+ final int numRecordsToReadBeforeSpilling = n / 3;
+ for (int i = 0; i < numRecordsToReadBeforeSpilling; i++) {
+ assertTrue(iter.hasNext());
+ iter.loadNext();
+ }
+
+ assertTrue(iter.spill() > 0);
+ assertEquals(0, iter.spill());
+
+ for (int i = numRecordsToReadBeforeSpilling; i < n; i++) {
+ assertTrue(iter.hasNext());
+ iter.loadNext();
+ }
+ assertFalse(iter.hasNext());
+
+ sorter.cleanupResources();
+ assertSpillFilesWereCleanedUp();
+ }
+
+ @Test
public void forcedSpillingWithNotReadIterator() throws Exception {
final UnsafeExternalSorter sorter = newSorter();
long[] record = new long[100];
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org