You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2017/07/11 03:47:53 UTC
spark git commit: [SPARK-21315][SQL] Skip some spill files when
generateIterator(startIndex) in ExternalAppendOnlyUnsafeRowArray.
Repository: spark
Updated Branches:
refs/heads/master 833eab2c9 -> 97a1aa2c7
[SPARK-21315][SQL] Skip some spill files when generateIterator(startIndex) in ExternalAppendOnlyUnsafeRowArray.
## What changes were proposed in this pull request?
In current code, it is expensive to use `UnboundedFollowingWindowFunctionFrame`, because it is iterating from the start to lower bound every time calling `write` method. When traverse the iterator, it's possible to skip some spilled files thus to save some time.
## How was this patch tested?
Added unit test
Did a small test for benchmark:
Put 2000200 rows into `UnsafeExternalSorter`-- 2 spill files(each contains 1000000 rows) and inMemSorter contains 200 rows.
Move the iterator forward to index=2000001.
*With this change*:
`getIterator(2000001)`, it will cost almost 0ms~1ms;
*Without this change*:
`for(int i=0; i<2000001; i++)geIterator().loadNext()`, it will cost 300ms.
Author: jinxing <ji...@126.com>
Closes #18541 from jinxing64/SPARK-21315.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/97a1aa2c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/97a1aa2c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/97a1aa2c
Branch: refs/heads/master
Commit: 97a1aa2c70b1bf726d5f572789e150d168ac61e5
Parents: 833eab2
Author: jinxing <ji...@126.com>
Authored: Tue Jul 11 11:47:47 2017 +0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Tue Jul 11 11:47:47 2017 +0800
----------------------------------------------------------------------
.../unsafe/sort/UnsafeExternalSorter.java | 35 +++++++++++++++++---
.../unsafe/sort/UnsafeSorterSpillWriter.java | 4 +++
.../unsafe/sort/UnsafeExternalSorterSuite.java | 34 ++++++++++++++++++-
.../ExternalAppendOnlyUnsafeRowArray.scala | 22 ++----------
...ernalAppendOnlyUnsafeRowArrayBenchmark.scala | 2 +-
5 files changed, 70 insertions(+), 27 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/97a1aa2c/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
index 82d03e3..a6e858c 100644
--- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
+++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
@@ -589,29 +589,54 @@ public final class UnsafeExternalSorter extends MemoryConsumer {
}
/**
- * Returns a iterator, which will return the rows in the order as inserted.
+ * Returns an iterator starts from startIndex, which will return the rows in the order as
+ * inserted.
*
* It is the caller's responsibility to call `cleanupResources()`
* after consuming this iterator.
*
* TODO: support forced spilling
*/
- public UnsafeSorterIterator getIterator() throws IOException {
+ public UnsafeSorterIterator getIterator(int startIndex) throws IOException {
if (spillWriters.isEmpty()) {
assert(inMemSorter != null);
- return inMemSorter.getSortedIterator();
+ UnsafeSorterIterator iter = inMemSorter.getSortedIterator();
+ moveOver(iter, startIndex);
+ return iter;
} else {
LinkedList<UnsafeSorterIterator> queue = new LinkedList<>();
+ int i = 0;
for (UnsafeSorterSpillWriter spillWriter : spillWriters) {
- queue.add(spillWriter.getReader(serializerManager));
+ if (i + spillWriter.recordsSpilled() > startIndex) {
+ UnsafeSorterIterator iter = spillWriter.getReader(serializerManager);
+ moveOver(iter, startIndex - i);
+ queue.add(iter);
+ }
+ i += spillWriter.recordsSpilled();
}
if (inMemSorter != null) {
- queue.add(inMemSorter.getSortedIterator());
+ UnsafeSorterIterator iter = inMemSorter.getSortedIterator();
+ moveOver(iter, startIndex - i);
+ queue.add(iter);
}
return new ChainedIterator(queue);
}
}
+ private void moveOver(UnsafeSorterIterator iter, int steps)
+ throws IOException {
+ if (steps > 0) {
+ for (int i = 0; i < steps; i++) {
+ if (iter.hasNext()) {
+ iter.loadNext();
+ } else {
+ throw new ArrayIndexOutOfBoundsException("Failed to move the iterator " + steps +
+ " steps forward");
+ }
+ }
+ }
+ }
+
/**
* Chain multiple UnsafeSorterIterator together as single one.
*/
http://git-wip-us.apache.org/repos/asf/spark/blob/97a1aa2c/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillWriter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillWriter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillWriter.java
index f9b5493..850f247 100644
--- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillWriter.java
+++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillWriter.java
@@ -155,4 +155,8 @@ public final class UnsafeSorterSpillWriter {
public UnsafeSorterSpillReader getReader(SerializerManager serializerManager) throws IOException {
return new UnsafeSorterSpillReader(serializerManager, file, blockId);
}
+
+ public int recordsSpilled() {
+ return numRecordsSpilled;
+ }
}
http://git-wip-us.apache.org/repos/asf/spark/blob/97a1aa2c/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java
index d31d7c1..cd5db1a 100644
--- a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java
+++ b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java
@@ -395,7 +395,7 @@ public class UnsafeExternalSorterSuite {
sorter.spill();
}
}
- UnsafeSorterIterator iter = sorter.getIterator();
+ UnsafeSorterIterator iter = sorter.getIterator(0);
for (int i = 0; i < n; i++) {
iter.hasNext();
iter.loadNext();
@@ -479,5 +479,37 @@ public class UnsafeExternalSorterSuite {
}
}
+ @Test
+ public void testGetIterator() throws Exception {
+ final UnsafeExternalSorter sorter = newSorter();
+ for (int i = 0; i < 100; i++) {
+ insertNumber(sorter, i);
+ }
+ verifyIntIterator(sorter.getIterator(0), 0, 100);
+ verifyIntIterator(sorter.getIterator(79), 79, 100);
+
+ sorter.spill();
+ for (int i = 100; i < 200; i++) {
+ insertNumber(sorter, i);
+ }
+ sorter.spill();
+ verifyIntIterator(sorter.getIterator(79), 79, 200);
+
+ for (int i = 200; i < 300; i++) {
+ insertNumber(sorter, i);
+ }
+ verifyIntIterator(sorter.getIterator(79), 79, 300);
+ verifyIntIterator(sorter.getIterator(139), 139, 300);
+ verifyIntIterator(sorter.getIterator(279), 279, 300);
+ }
+
+ private void verifyIntIterator(UnsafeSorterIterator iter, int start, int end)
+ throws IOException {
+ for (int i = start; i < end; i++) {
+ assert (iter.hasNext());
+ iter.loadNext();
+ assert (Platform.getInt(iter.getBaseObject(), iter.getBaseOffset()) == i);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/spark/blob/97a1aa2c/sql/core/src/main/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArray.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArray.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArray.scala
index 458ac4b..c4d3834 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArray.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArray.scala
@@ -166,7 +166,7 @@ private[sql] class ExternalAppendOnlyUnsafeRowArray(
if (spillableArray == null) {
new InMemoryBufferIterator(startIndex)
} else {
- new SpillableArrayIterator(spillableArray.getIterator, numFieldsPerRow, startIndex)
+ new SpillableArrayIterator(spillableArray.getIterator(startIndex), numFieldsPerRow)
}
}
@@ -204,29 +204,11 @@ private[sql] class ExternalAppendOnlyUnsafeRowArray(
private[this] class SpillableArrayIterator(
iterator: UnsafeSorterIterator,
- numFieldPerRow: Int,
- startIndex: Int)
+ numFieldPerRow: Int)
extends ExternalAppendOnlyUnsafeRowArrayIterator {
private val currentRow = new UnsafeRow(numFieldPerRow)
- def init(): Unit = {
- var i = 0
- while (i < startIndex) {
- if (iterator.hasNext) {
- iterator.loadNext()
- } else {
- throw new ArrayIndexOutOfBoundsException(
- "Invalid `startIndex` provided for generating iterator over the array. " +
- s"Total elements: $numRows, requested `startIndex`: $startIndex")
- }
- i += 1
- }
- }
-
- // Traverse upto the given [[startIndex]]
- init()
-
override def hasNext(): Boolean = !isModified() && iterator.hasNext
override def next(): UnsafeRow = {
http://git-wip-us.apache.org/repos/asf/spark/blob/97a1aa2c/sql/core/src/test/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArrayBenchmark.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArrayBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArrayBenchmark.scala
index 00c5f25..031ac38 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArrayBenchmark.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArrayBenchmark.scala
@@ -130,7 +130,7 @@ object ExternalAppendOnlyUnsafeRowArrayBenchmark {
false))
val unsafeRow = new UnsafeRow(1)
- val iter = array.getIterator
+ val iter = array.getIterator(0)
while (iter.hasNext) {
iter.loadNext()
unsafeRow.pointTo(iter.getBaseObject, iter.getBaseOffset, iter.getRecordLength)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org