You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2019/06/04 18:29:39 UTC

[incubator-iceberg] branch master updated: Plan splits using both offsets and target size (#204)

This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 7cde609  Plan splits using both offsets and target size (#204)
7cde609 is described below

commit 7cde6091d8970f70fecee29d39797a9d562ff9f4
Author: Samarth Jain <sa...@apache.org>
AuthorDate: Tue Jun 4 11:29:34 2019 -0700

    Plan splits using both offsets and target size (#204)
---
 .../java/org/apache/iceberg/BaseFileScanTask.java  | 54 ++++++++++++++++------
 .../TestOffsetsBasedSplitScanTaskIterator.java     | 33 ++++++++-----
 2 files changed, 61 insertions(+), 26 deletions(-)

diff --git a/core/src/main/java/org/apache/iceberg/BaseFileScanTask.java b/core/src/main/java/org/apache/iceberg/BaseFileScanTask.java
index b90805c..77ae7a6 100644
--- a/core/src/main/java/org/apache/iceberg/BaseFileScanTask.java
+++ b/core/src/main/java/org/apache/iceberg/BaseFileScanTask.java
@@ -22,8 +22,10 @@ package org.apache.iceberg;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Objects;
 import com.google.common.collect.ImmutableList;
+import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
+import java.util.NoSuchElementException;
 import org.apache.iceberg.expressions.Expression;
 import org.apache.iceberg.expressions.ResidualEvaluator;
 
@@ -71,12 +73,12 @@ class BaseFileScanTask implements FileScanTask {
   }
 
   @Override
-  public Iterable<FileScanTask> split(long splitSize) {
+  public Iterable<FileScanTask> split(long targetSplitSize) {
     if (file.format().isSplittable()) {
       if (file.splitOffsets() != null) {
-        return () -> new OffsetsBasedSplitScanTaskIterator(file.splitOffsets(), this);
+        return () -> new OffsetsAwareTargetSplitSizeScanTaskIterator(file.splitOffsets(), this, targetSplitSize);
       } else {
-        return () -> new FixedSizeSplitScanTaskIterator(splitSize, this);
+        return () -> new FixedSizeSplitScanTaskIterator(targetSplitSize, this);
       }
     }
     return ImmutableList.of(this);
@@ -91,29 +93,51 @@ class BaseFileScanTask implements FileScanTask {
         .toString();
   }
 
+  /**
+   * This iterator returns {@link FileScanTask} using guidance provided by split offsets.
+   */
   @VisibleForTesting
-  static final class OffsetsBasedSplitScanTaskIterator implements Iterator<FileScanTask> {
-    private final List<Long> splitOffsets;
+  static final class OffsetsAwareTargetSplitSizeScanTaskIterator implements Iterator<FileScanTask> {
+    private final List<Long> offsets;
+    private final List<Long> splitSizes;
     private final FileScanTask parentScanTask;
-    private int idx = 0;
-
-    OffsetsBasedSplitScanTaskIterator(List<Long> splitOffsets, FileScanTask fileScanTask) {
-      this.splitOffsets = splitOffsets;
-      this.parentScanTask = fileScanTask;
+    private final long targetSplitSize;
+    private int sizeIdx = 0;
+
+    OffsetsAwareTargetSplitSizeScanTaskIterator(
+        List<Long> offsetList, FileScanTask parentScanTask, long targetSplitSize) {
+      this.offsets = ImmutableList.copyOf(offsetList);
+      this.parentScanTask = parentScanTask;
+      this.targetSplitSize = targetSplitSize;
+      this.splitSizes = new ArrayList<>(offsets.size());
+      int lastIndex = offsets.size() - 1;
+      for (int index = 0; index < lastIndex; index++) {
+        splitSizes.add(offsets.get(index + 1) - offsets.get(index));
+      }
+      splitSizes.add(parentScanTask.length() - offsets.get(lastIndex));
     }
 
     @Override
     public boolean hasNext() {
-      return idx < splitOffsets.size();
+      return sizeIdx < splitSizes.size();
     }
 
     @Override
     public FileScanTask next() {
-      long start = splitOffsets.get(idx);
-      idx++;
-      long end = hasNext() ? splitOffsets.get(idx) : parentScanTask.length();
-      return new SplitScanTask(start, end - start, parentScanTask);
+      if (!hasNext()) {
+        throw new NoSuchElementException();
+      }
+      int offsetIdx = sizeIdx;
+      long currentSize = splitSizes.get(sizeIdx);
+      sizeIdx += 1; // always consume at least one file split
+      while (sizeIdx < splitSizes.size() && currentSize + splitSizes.get(sizeIdx) <= targetSplitSize) {
+        currentSize += splitSizes.get(sizeIdx);
+        sizeIdx += 1;
+      }
+      FileScanTask combinedTask = new SplitScanTask(offsets.get(offsetIdx), currentSize, parentScanTask);
+      return combinedTask;
     }
+
   }
 
   @VisibleForTesting
diff --git a/core/src/test/java/org/apache/iceberg/TestOffsetsBasedSplitScanTaskIterator.java b/core/src/test/java/org/apache/iceberg/TestOffsetsBasedSplitScanTaskIterator.java
index 0b70ebb..f5524c8 100644
--- a/core/src/test/java/org/apache/iceberg/TestOffsetsBasedSplitScanTaskIterator.java
+++ b/core/src/test/java/org/apache/iceberg/TestOffsetsBasedSplitScanTaskIterator.java
@@ -28,23 +28,34 @@ public class TestOffsetsBasedSplitScanTaskIterator {
   @Test
   public void testSplits() {
     // case when the last row group has more than one byte
-    verify(asList(4L, 10L, 15L, 18L, 30L, 45L), 48L, asList(
-        asList(4L, 6L), asList(10L, 5L), asList(15L, 3L), asList(18L, 12L), asList(30L, 15L),
-        asList(45L, 3L)));
+    verify(asList(4L, 10L, 15L, 18L, 30L, 45L), 48L, 20, asList(
+        asList(4L, 14L), asList(18L, 12L), asList(30L, 18L)));
 
     // case when the last row group has 1 byte
-    verify(asList(4L, 10L, 15L, 18L, 30L, 45L), 46L, asList(
-        asList(4L, 6L), asList(10L, 5L), asList(15L, 3L), asList(18L, 12L), asList(30L, 15L),
-        asList(45L, 1L)));
+    verify(asList(4L, 10L, 15L, 18L, 30L, 45L), 46L, 20, asList(
+        asList(4L, 14L), asList(18L, 12L), asList(30L, 16L)));
 
-    // case when there is only one row group
-    verify(asList(4L), 48L, asList(
-        asList(4L, 44L)));
+    // case when every row group is of target split size
+    verify(asList(4L, 24L, 44L, 64L, 84L, 104L), 124L, 20, asList(
+        asList(4L, 20L), asList(24L, 20L), asList(44L, 20L),
+        asList(64L, 20L), asList(84L, 20L), asList(104L, 20L)));
+
+    // case when every row group except last one is of target split size
+    verify(asList(4L, 24L, 44L, 64L, 84L, 104L), 108L, 20, asList(
+        asList(4L, 20L), asList(24L, 20L), asList(44L, 20L),
+        asList(64L, 20L), asList(84L, 20L), asList(104L, 4L)));
+
+    // case when target split size is smaller than splits determined by offset boundaries
+    verify(asList(4L, 24L, 44L, 64L, 84L, 104L), 108L, 2, asList(
+        asList(4L, 20L), asList(24L, 20L), asList(44L, 20L),
+        asList(64L, 20L), asList(84L, 20L), asList(104L, 4L)));
   }
 
-  private static void verify(List<Long> offsetRanges, long fileLen, List<List<Long>> offsetLenPairs) {
+  private static void verify(List<Long> offsetRanges, long fileLen,
+                             long targetSplitSize, List<List<Long>> offsetLenPairs) {
     List<FileScanTask> tasks = Lists.newArrayList(
-            new BaseFileScanTask.OffsetsBasedSplitScanTaskIterator(offsetRanges, new MockFileScanTask(fileLen)));
+        new BaseFileScanTask.OffsetsAwareTargetSplitSizeScanTaskIterator(
+            offsetRanges, new MockFileScanTask(fileLen), targetSplitSize));
     Assert.assertEquals("Number of tasks don't match", offsetLenPairs.size(), tasks.size());
     for (int i = 0; i < tasks.size(); i++) {
       FileScanTask task = tasks.get(i);