You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2019/03/24 22:29:17 UTC
[incubator-iceberg] branch master updated: Use largest task first
in split planning (#117)
This is an automated email from the ASF dual-hosted git repository.
blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new c8df297 Use largest task first in split planning (#117)
c8df297 is described below
commit c8df297c27e9da954e4ce5bb6ea9d0d86b8bc78b
Author: Romin Parekh <ro...@gmail.com>
AuthorDate: Sun Mar 24 15:29:13 2019 -0700
Use largest task first in split planning (#117)
---
.../java/org/apache/iceberg/BaseTableScan.java | 2 +-
.../org/apache/iceberg/MergingSnapshotUpdate.java | 2 +-
.../java/org/apache/iceberg/util/BinPacking.java | 44 ++++++++++++++++++----
.../org/apache/iceberg/util/TestBinPacking.java | 17 ++++++++-
4 files changed, 54 insertions(+), 11 deletions(-)
diff --git a/core/src/main/java/org/apache/iceberg/BaseTableScan.java b/core/src/main/java/org/apache/iceberg/BaseTableScan.java
index e4c8de6..6ec9dac 100644
--- a/core/src/main/java/org/apache/iceberg/BaseTableScan.java
+++ b/core/src/main/java/org/apache/iceberg/BaseTableScan.java
@@ -217,7 +217,7 @@ class BaseTableScan implements TableScan {
return CloseableIterable.transform(
CloseableIterable.wrap(splitFiles(splitSize), splits ->
- new BinPacking.PackingIterable<>(splits, splitSize, lookback, weightFunc)),
+ new BinPacking.PackingIterable<>(splits, splitSize, lookback, weightFunc, true)),
BaseCombinedScanTask::new);
}
diff --git a/core/src/main/java/org/apache/iceberg/MergingSnapshotUpdate.java b/core/src/main/java/org/apache/iceberg/MergingSnapshotUpdate.java
index 3a4d493..af873c7 100644
--- a/core/src/main/java/org/apache/iceberg/MergingSnapshotUpdate.java
+++ b/core/src/main/java/org/apache/iceberg/MergingSnapshotUpdate.java
@@ -436,7 +436,7 @@ abstract class MergingSnapshotUpdate extends SnapshotUpdate {
// use a lookback of 1 to avoid reordering the manifests. using 1 also means this should pack
// from the end so that the manifest that gets under-filled is the first one, which will be
// merged the next time.
- ListPacker<ManifestFile> packer = new ListPacker<>(manifestTargetSizeBytes, 1);
+ ListPacker<ManifestFile> packer = new ListPacker<>(manifestTargetSizeBytes, 1, false);
List<List<ManifestFile>> bins = packer.packEnd(group, manifest -> manifest.length());
// process bins in parallel, but put results in the order of the bins into an array to preserve
diff --git a/core/src/main/java/org/apache/iceberg/util/BinPacking.java b/core/src/main/java/org/apache/iceberg/util/BinPacking.java
index d99674f..7573437 100644
--- a/core/src/main/java/org/apache/iceberg/util/BinPacking.java
+++ b/core/src/main/java/org/apache/iceberg/util/BinPacking.java
@@ -23,6 +23,8 @@ import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
+import java.util.Collections;
+import java.util.Comparator;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
@@ -33,20 +35,22 @@ public class BinPacking {
public static class ListPacker<T> {
private final long targetWeight;
private final int lookback;
+ private final boolean largestBinFirst;
- public ListPacker(long targetWeight, int lookback) {
+ public ListPacker(long targetWeight, int lookback, boolean largestBinFirst) {
this.targetWeight = targetWeight;
this.lookback = lookback;
+ this.largestBinFirst = largestBinFirst;
}
public List<List<T>> packEnd(List<T> items, Function<T, Long> weightFunc) {
return Lists.reverse(ImmutableList.copyOf(Iterables.transform(
- new PackingIterable<>(Lists.reverse(items), targetWeight, lookback, weightFunc),
+ new PackingIterable<>(Lists.reverse(items), targetWeight, lookback, weightFunc, largestBinFirst),
Lists::reverse)));
}
public List<List<T>> pack(Iterable<T> items, Function<T, Long> weightFunc) {
- return ImmutableList.copyOf(new PackingIterable<>(items, targetWeight, lookback, weightFunc));
+ return ImmutableList.copyOf(new PackingIterable<>(items, targetWeight, lookback, weightFunc, largestBinFirst));
}
}
@@ -55,20 +59,22 @@ public class BinPacking {
private final long targetWeight;
private final int lookback;
private final Function<T, Long> weightFunc;
+ private final boolean largestBinFirst;
public PackingIterable(Iterable<T> iterable, long targetWeight, int lookback,
- Function<T, Long> weightFunc) {
+ Function<T, Long> weightFunc, boolean largestBinFirst) {
Preconditions.checkArgument(lookback > 0,
"Bin look-back size must be greater than 0: %s", lookback);
this.iterable = iterable;
this.targetWeight = targetWeight;
this.lookback = lookback;
this.weightFunc = weightFunc;
+ this.largestBinFirst = largestBinFirst;
}
@Override
public Iterator<List<T>> iterator() {
- return new PackingIterator<>(iterable.iterator(), targetWeight, lookback, weightFunc);
+ return new PackingIterator<>(iterable.iterator(), targetWeight, lookback, weightFunc, largestBinFirst);
}
}
@@ -78,13 +84,15 @@ public class BinPacking {
private final long targetWeight;
private final int lookback;
private final Function<T, Long> weightFunc;
+ private final boolean largestBinFirst;
private PackingIterator(Iterator<T> items, long targetWeight, int lookback,
- Function<T, Long> weightFunc) {
+ Function<T, Long> weightFunc, boolean largestBinFirst) {
this.items = items;
this.targetWeight = targetWeight;
this.lookback = lookback;
this.weightFunc = weightFunc;
+ this.largestBinFirst = largestBinFirst;
}
public boolean hasNext() {
@@ -107,7 +115,13 @@ public class BinPacking {
bins.addLast(bin);
if (bins.size() > lookback) {
- return ImmutableList.copyOf(bins.removeFirst().items());
+ Bin binToRemove;
+ if (largestBinFirst) {
+ binToRemove = removeLargestBin(bins);
+ } else {
+ binToRemove = bins.removeFirst();
+ }
+ return ImmutableList.copyOf(binToRemove.items());
}
}
}
@@ -119,6 +133,18 @@ public class BinPacking {
return ImmutableList.copyOf(bins.removeFirst().items());
}
+ private Bin removeLargestBin(LinkedList<Bin> bins) {
+ // Iterate through all bins looking for one with maximum weight, taking O(n) time.
+ Bin maxBin = Collections.max(bins, Comparator.comparingLong(Bin::weight));
+
+ // Sanity check: we have removed maxBin from list of bins.
+ if (bins.remove(maxBin)) {
+ return maxBin;
+ } else {
+ throw new NoSuchElementException();
+ }
+ }
+
private Bin find(List<Bin> bins, long weight) {
for (Bin bin : bins) {
if (bin.canAdd(weight)) {
@@ -144,6 +170,10 @@ public class BinPacking {
this.binWeight += weight;
items.add(item);
}
+
+ public long weight() {
+ return binWeight;
+ }
}
}
}
diff --git a/core/src/test/java/org/apache/iceberg/util/TestBinPacking.java b/core/src/test/java/org/apache/iceberg/util/TestBinPacking.java
index 6877167..06a47f8 100644
--- a/core/src/test/java/org/apache/iceberg/util/TestBinPacking.java
+++ b/core/src/test/java/org/apache/iceberg/util/TestBinPacking.java
@@ -162,6 +162,15 @@ public class TestBinPacking {
// 6. [5, 1]
Assert.assertEquals("1 bin look-back: should merge ones with fives",
l(l(5, 1), l(5, 1), l(5, 1)), pack(l(5, 1, 5, 1, 5, 1), 8, 1));
+
+ Assert.assertEquals("2 bin look-back: should merge until targetWeight when largestBinFirst is enabled",
+ l(l(36, 36, 36), l(128), l(36, 65), l(65)),
+ pack(l(36, 36, 36, 36, 65, 65, 128), 128, 2, true));
+
+ Assert.assertEquals(
+ "1 bin look-back: should merge until targetWeight when largestBinFirst is enabled",
+ l(l(64, 64), l(128), l(32, 32, 32, 32)),
+ pack(l(64, 64, 128, 32, 32, 32, 32), 128, 1, true));
}
private List<List<Integer>> pack(List<Integer> items, long targetWeight) {
@@ -169,7 +178,11 @@ public class TestBinPacking {
}
private List<List<Integer>> pack(List<Integer> items, long targetWeight, int lookback) {
- ListPacker<Integer> packer = new ListPacker<>(targetWeight, lookback);
+ return pack(items, targetWeight, lookback, false);
+ }
+
+ private List<List<Integer>> pack(List<Integer> items, long targetWeight, int lookback, boolean largestBinFirst) {
+ ListPacker<Integer> packer = new ListPacker<>(targetWeight, lookback, largestBinFirst);
return packer.pack(items, Integer::longValue);
}
@@ -178,7 +191,7 @@ public class TestBinPacking {
}
private List<List<Integer>> packEnd(List<Integer> items, long targetWeight, int lookback) {
- ListPacker<Integer> packer = new ListPacker<>(targetWeight, lookback);
+ ListPacker<Integer> packer = new ListPacker<>(targetWeight, lookback, false);
return packer.packEnd(items, Integer::longValue);
}