You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2017/07/11 03:47:53 UTC

spark git commit: [SPARK-21315][SQL] Skip some spill files when generateIterator(startIndex) in ExternalAppendOnlyUnsafeRowArray.

Repository: spark
Updated Branches:
  refs/heads/master 833eab2c9 -> 97a1aa2c7


[SPARK-21315][SQL] Skip some spill files when generateIterator(startIndex) in ExternalAppendOnlyUnsafeRowArray.

## What changes were proposed in this pull request?

In current code, it is expensive to use `UnboundedFollowingWindowFunctionFrame`, because it is iterating from the start to lower bound every time calling `write` method. When traverse the iterator, it's possible to skip some spilled files thus to save some time.

## How was this patch tested?

Added unit test

Did a small test for benchmark:

Put 2000200 rows into `UnsafeExternalSorter`-- 2 spill files(each contains 1000000 rows) and inMemSorter contains 200 rows.
Move the iterator forward to index=2000001.

*With this change*:
`getIterator(2000001)`, it will cost almost 0ms~1ms;
*Without this change*:
`for(int i=0; i<2000001; i++)geIterator().loadNext()`, it will cost 300ms.

Author: jinxing <ji...@126.com>

Closes #18541 from jinxing64/SPARK-21315.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/97a1aa2c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/97a1aa2c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/97a1aa2c

Branch: refs/heads/master
Commit: 97a1aa2c70b1bf726d5f572789e150d168ac61e5
Parents: 833eab2
Author: jinxing <ji...@126.com>
Authored: Tue Jul 11 11:47:47 2017 +0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Tue Jul 11 11:47:47 2017 +0800

----------------------------------------------------------------------
 .../unsafe/sort/UnsafeExternalSorter.java       | 35 +++++++++++++++++---
 .../unsafe/sort/UnsafeSorterSpillWriter.java    |  4 +++
 .../unsafe/sort/UnsafeExternalSorterSuite.java  | 34 ++++++++++++++++++-
 .../ExternalAppendOnlyUnsafeRowArray.scala      | 22 ++----------
 ...ernalAppendOnlyUnsafeRowArrayBenchmark.scala |  2 +-
 5 files changed, 70 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/97a1aa2c/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
index 82d03e3..a6e858c 100644
--- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
+++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
@@ -589,29 +589,54 @@ public final class UnsafeExternalSorter extends MemoryConsumer {
   }
 
   /**
-   * Returns a iterator, which will return the rows in the order as inserted.
+   * Returns an iterator starts from startIndex, which will return the rows in the order as
+   * inserted.
    *
    * It is the caller's responsibility to call `cleanupResources()`
    * after consuming this iterator.
    *
    * TODO: support forced spilling
    */
-  public UnsafeSorterIterator getIterator() throws IOException {
+  public UnsafeSorterIterator getIterator(int startIndex) throws IOException {
     if (spillWriters.isEmpty()) {
       assert(inMemSorter != null);
-      return inMemSorter.getSortedIterator();
+      UnsafeSorterIterator iter = inMemSorter.getSortedIterator();
+      moveOver(iter, startIndex);
+      return iter;
     } else {
       LinkedList<UnsafeSorterIterator> queue = new LinkedList<>();
+      int i = 0;
       for (UnsafeSorterSpillWriter spillWriter : spillWriters) {
-        queue.add(spillWriter.getReader(serializerManager));
+        if (i + spillWriter.recordsSpilled() > startIndex) {
+          UnsafeSorterIterator iter = spillWriter.getReader(serializerManager);
+          moveOver(iter, startIndex - i);
+          queue.add(iter);
+        }
+        i += spillWriter.recordsSpilled();
       }
       if (inMemSorter != null) {
-        queue.add(inMemSorter.getSortedIterator());
+        UnsafeSorterIterator iter = inMemSorter.getSortedIterator();
+        moveOver(iter, startIndex - i);
+        queue.add(iter);
       }
       return new ChainedIterator(queue);
     }
   }
 
+  private void moveOver(UnsafeSorterIterator iter, int steps)
+      throws IOException {
+    if (steps > 0) {
+      for (int i = 0; i < steps; i++) {
+        if (iter.hasNext()) {
+          iter.loadNext();
+        } else {
+          throw new ArrayIndexOutOfBoundsException("Failed to move the iterator " + steps +
+            " steps forward");
+        }
+      }
+    }
+  }
+
   /**
    * Chain multiple UnsafeSorterIterator together as single one.
    */

http://git-wip-us.apache.org/repos/asf/spark/blob/97a1aa2c/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillWriter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillWriter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillWriter.java
index f9b5493..850f247 100644
--- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillWriter.java
+++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillWriter.java
@@ -155,4 +155,8 @@ public final class UnsafeSorterSpillWriter {
   public UnsafeSorterSpillReader getReader(SerializerManager serializerManager) throws IOException {
     return new UnsafeSorterSpillReader(serializerManager, file, blockId);
   }
+
+  public int recordsSpilled() {
+    return numRecordsSpilled;
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/97a1aa2c/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java
index d31d7c1..cd5db1a 100644
--- a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java
+++ b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java
@@ -395,7 +395,7 @@ public class UnsafeExternalSorterSuite {
         sorter.spill();
       }
     }
-    UnsafeSorterIterator iter = sorter.getIterator();
+    UnsafeSorterIterator iter = sorter.getIterator(0);
     for (int i = 0; i < n; i++) {
       iter.hasNext();
       iter.loadNext();
@@ -479,5 +479,37 @@ public class UnsafeExternalSorterSuite {
     }
   }
 
+  @Test
+  public void testGetIterator() throws Exception {
+    final UnsafeExternalSorter sorter = newSorter();
+    for (int i = 0; i < 100; i++) {
+      insertNumber(sorter, i);
+    }
+    verifyIntIterator(sorter.getIterator(0), 0, 100);
+    verifyIntIterator(sorter.getIterator(79), 79, 100);
+
+    sorter.spill();
+    for (int i = 100; i < 200; i++) {
+      insertNumber(sorter, i);
+    }
+    sorter.spill();
+    verifyIntIterator(sorter.getIterator(79), 79, 200);
+
+    for (int i = 200; i < 300; i++) {
+      insertNumber(sorter, i);
+    }
+    verifyIntIterator(sorter.getIterator(79), 79, 300);
+    verifyIntIterator(sorter.getIterator(139), 139, 300);
+    verifyIntIterator(sorter.getIterator(279), 279, 300);
+  }
+
+  private void verifyIntIterator(UnsafeSorterIterator iter, int start, int end)
+      throws IOException {
+    for (int i = start; i < end; i++) {
+      assert (iter.hasNext());
+      iter.loadNext();
+      assert (Platform.getInt(iter.getBaseObject(), iter.getBaseOffset()) == i);
+    }
+  }
 }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/97a1aa2c/sql/core/src/main/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArray.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArray.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArray.scala
index 458ac4b..c4d3834 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArray.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArray.scala
@@ -166,7 +166,7 @@ private[sql] class ExternalAppendOnlyUnsafeRowArray(
     if (spillableArray == null) {
       new InMemoryBufferIterator(startIndex)
     } else {
-      new SpillableArrayIterator(spillableArray.getIterator, numFieldsPerRow, startIndex)
+      new SpillableArrayIterator(spillableArray.getIterator(startIndex), numFieldsPerRow)
     }
   }
 
@@ -204,29 +204,11 @@ private[sql] class ExternalAppendOnlyUnsafeRowArray(
 
   private[this] class SpillableArrayIterator(
       iterator: UnsafeSorterIterator,
-      numFieldPerRow: Int,
-      startIndex: Int)
+      numFieldPerRow: Int)
     extends ExternalAppendOnlyUnsafeRowArrayIterator {
 
     private val currentRow = new UnsafeRow(numFieldPerRow)
 
-    def init(): Unit = {
-      var i = 0
-      while (i < startIndex) {
-        if (iterator.hasNext) {
-          iterator.loadNext()
-        } else {
-          throw new ArrayIndexOutOfBoundsException(
-            "Invalid `startIndex` provided for generating iterator over the array. " +
-              s"Total elements: $numRows, requested `startIndex`: $startIndex")
-        }
-        i += 1
-      }
-    }
-
-    // Traverse upto the given [[startIndex]]
-    init()
-
     override def hasNext(): Boolean = !isModified() && iterator.hasNext
 
     override def next(): UnsafeRow = {

http://git-wip-us.apache.org/repos/asf/spark/blob/97a1aa2c/sql/core/src/test/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArrayBenchmark.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArrayBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArrayBenchmark.scala
index 00c5f25..031ac38 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArrayBenchmark.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArrayBenchmark.scala
@@ -130,7 +130,7 @@ object ExternalAppendOnlyUnsafeRowArrayBenchmark {
             false))
 
         val unsafeRow = new UnsafeRow(1)
-        val iter = array.getIterator
+        val iter = array.getIterator(0)
         while (iter.hasNext) {
           iter.loadNext()
           unsafeRow.pointTo(iter.getBaseObject, iter.getBaseOffset, iter.getRecordLength)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org