You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2019/06/04 18:29:39 UTC
[incubator-iceberg] branch master updated: Plan splits using both
offsets and target size (#204)
This is an automated email from the ASF dual-hosted git repository.
blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 7cde609 Plan splits using both offsets and target size (#204)
7cde609 is described below
commit 7cde6091d8970f70fecee29d39797a9d562ff9f4
Author: Samarth Jain <sa...@apache.org>
AuthorDate: Tue Jun 4 11:29:34 2019 -0700
Plan splits using both offsets and target size (#204)
---
.../java/org/apache/iceberg/BaseFileScanTask.java | 54 ++++++++++++++++------
.../TestOffsetsBasedSplitScanTaskIterator.java | 33 ++++++++-----
2 files changed, 61 insertions(+), 26 deletions(-)
diff --git a/core/src/main/java/org/apache/iceberg/BaseFileScanTask.java b/core/src/main/java/org/apache/iceberg/BaseFileScanTask.java
index b90805c..77ae7a6 100644
--- a/core/src/main/java/org/apache/iceberg/BaseFileScanTask.java
+++ b/core/src/main/java/org/apache/iceberg/BaseFileScanTask.java
@@ -22,8 +22,10 @@ package org.apache.iceberg;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Objects;
import com.google.common.collect.ImmutableList;
+import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
+import java.util.NoSuchElementException;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.ResidualEvaluator;
@@ -71,12 +73,12 @@ class BaseFileScanTask implements FileScanTask {
}
@Override
- public Iterable<FileScanTask> split(long splitSize) {
+ public Iterable<FileScanTask> split(long targetSplitSize) {
if (file.format().isSplittable()) {
if (file.splitOffsets() != null) {
- return () -> new OffsetsBasedSplitScanTaskIterator(file.splitOffsets(), this);
+ return () -> new OffsetsAwareTargetSplitSizeScanTaskIterator(file.splitOffsets(), this, targetSplitSize);
} else {
- return () -> new FixedSizeSplitScanTaskIterator(splitSize, this);
+ return () -> new FixedSizeSplitScanTaskIterator(targetSplitSize, this);
}
}
return ImmutableList.of(this);
@@ -91,29 +93,51 @@ class BaseFileScanTask implements FileScanTask {
.toString();
}
+ /**
+ * This iterator returns {@link FileScanTask} using guidance provided by split offsets.
+ */
@VisibleForTesting
- static final class OffsetsBasedSplitScanTaskIterator implements Iterator<FileScanTask> {
- private final List<Long> splitOffsets;
+ static final class OffsetsAwareTargetSplitSizeScanTaskIterator implements Iterator<FileScanTask> {
+ private final List<Long> offsets;
+ private final List<Long> splitSizes;
private final FileScanTask parentScanTask;
- private int idx = 0;
-
- OffsetsBasedSplitScanTaskIterator(List<Long> splitOffsets, FileScanTask fileScanTask) {
- this.splitOffsets = splitOffsets;
- this.parentScanTask = fileScanTask;
+ private final long targetSplitSize;
+ private int sizeIdx = 0;
+
+ OffsetsAwareTargetSplitSizeScanTaskIterator(
+ List<Long> offsetList, FileScanTask parentScanTask, long targetSplitSize) {
+ this.offsets = ImmutableList.copyOf(offsetList);
+ this.parentScanTask = parentScanTask;
+ this.targetSplitSize = targetSplitSize;
+ this.splitSizes = new ArrayList<>(offsets.size());
+ int lastIndex = offsets.size() - 1;
+ for (int index = 0; index < lastIndex; index++) {
+ splitSizes.add(offsets.get(index + 1) - offsets.get(index));
+ }
+ splitSizes.add(parentScanTask.length() - offsets.get(lastIndex));
}
@Override
public boolean hasNext() {
- return idx < splitOffsets.size();
+ return sizeIdx < splitSizes.size();
}
@Override
public FileScanTask next() {
- long start = splitOffsets.get(idx);
- idx++;
- long end = hasNext() ? splitOffsets.get(idx) : parentScanTask.length();
- return new SplitScanTask(start, end - start, parentScanTask);
+ if (!hasNext()) {
+ throw new NoSuchElementException();
+ }
+ int offsetIdx = sizeIdx;
+ long currentSize = splitSizes.get(sizeIdx);
+ sizeIdx += 1; // always consume at least one file split
+ while (sizeIdx < splitSizes.size() && currentSize + splitSizes.get(sizeIdx) <= targetSplitSize) {
+ currentSize += splitSizes.get(sizeIdx);
+ sizeIdx += 1;
+ }
+ FileScanTask combinedTask = new SplitScanTask(offsets.get(offsetIdx), currentSize, parentScanTask);
+ return combinedTask;
}
+
}
@VisibleForTesting
diff --git a/core/src/test/java/org/apache/iceberg/TestOffsetsBasedSplitScanTaskIterator.java b/core/src/test/java/org/apache/iceberg/TestOffsetsBasedSplitScanTaskIterator.java
index 0b70ebb..f5524c8 100644
--- a/core/src/test/java/org/apache/iceberg/TestOffsetsBasedSplitScanTaskIterator.java
+++ b/core/src/test/java/org/apache/iceberg/TestOffsetsBasedSplitScanTaskIterator.java
@@ -28,23 +28,34 @@ public class TestOffsetsBasedSplitScanTaskIterator {
@Test
public void testSplits() {
// case when the last row group has more than one byte
- verify(asList(4L, 10L, 15L, 18L, 30L, 45L), 48L, asList(
- asList(4L, 6L), asList(10L, 5L), asList(15L, 3L), asList(18L, 12L), asList(30L, 15L),
- asList(45L, 3L)));
+ verify(asList(4L, 10L, 15L, 18L, 30L, 45L), 48L, 20, asList(
+ asList(4L, 14L), asList(18L, 12L), asList(30L, 18L)));
// case when the last row group has 1 byte
- verify(asList(4L, 10L, 15L, 18L, 30L, 45L), 46L, asList(
- asList(4L, 6L), asList(10L, 5L), asList(15L, 3L), asList(18L, 12L), asList(30L, 15L),
- asList(45L, 1L)));
+ verify(asList(4L, 10L, 15L, 18L, 30L, 45L), 46L, 20, asList(
+ asList(4L, 14L), asList(18L, 12L), asList(30L, 16L)));
- // case when there is only one row group
- verify(asList(4L), 48L, asList(
- asList(4L, 44L)));
+ // case when every row group is of target split size
+ verify(asList(4L, 24L, 44L, 64L, 84L, 104L), 124L, 20, asList(
+ asList(4L, 20L), asList(24L, 20L), asList(44L, 20L),
+ asList(64L, 20L), asList(84L, 20L), asList(104L, 20L)));
+
+ // case when every row group except last one is of target split size
+ verify(asList(4L, 24L, 44L, 64L, 84L, 104L), 108L, 20, asList(
+ asList(4L, 20L), asList(24L, 20L), asList(44L, 20L),
+ asList(64L, 20L), asList(84L, 20L), asList(104L, 4L)));
+
+ // case when target split size is smaller than splits determined by offset boundaries
+ verify(asList(4L, 24L, 44L, 64L, 84L, 104L), 108L, 2, asList(
+ asList(4L, 20L), asList(24L, 20L), asList(44L, 20L),
+ asList(64L, 20L), asList(84L, 20L), asList(104L, 4L)));
}
- private static void verify(List<Long> offsetRanges, long fileLen, List<List<Long>> offsetLenPairs) {
+ private static void verify(List<Long> offsetRanges, long fileLen,
+ long targetSplitSize, List<List<Long>> offsetLenPairs) {
List<FileScanTask> tasks = Lists.newArrayList(
- new BaseFileScanTask.OffsetsBasedSplitScanTaskIterator(offsetRanges, new MockFileScanTask(fileLen)));
+ new BaseFileScanTask.OffsetsAwareTargetSplitSizeScanTaskIterator(
+ offsetRanges, new MockFileScanTask(fileLen), targetSplitSize));
Assert.assertEquals("Number of tasks don't match", offsetLenPairs.size(), tasks.size());
for (int i = 0; i < tasks.size(); i++) {
FileScanTask task = tasks.get(i);