You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2019/03/24 22:29:17 UTC

[incubator-iceberg] branch master updated: Use largest task first in split planning (#117)

This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new c8df297  Use largest task first in split planning (#117)
c8df297 is described below

commit c8df297c27e9da954e4ce5bb6ea9d0d86b8bc78b
Author: Romin Parekh <ro...@gmail.com>
AuthorDate: Sun Mar 24 15:29:13 2019 -0700

    Use largest task first in split planning (#117)
---
 .../java/org/apache/iceberg/BaseTableScan.java     |  2 +-
 .../org/apache/iceberg/MergingSnapshotUpdate.java  |  2 +-
 .../java/org/apache/iceberg/util/BinPacking.java   | 44 ++++++++++++++++++----
 .../org/apache/iceberg/util/TestBinPacking.java    | 17 ++++++++-
 4 files changed, 54 insertions(+), 11 deletions(-)

diff --git a/core/src/main/java/org/apache/iceberg/BaseTableScan.java b/core/src/main/java/org/apache/iceberg/BaseTableScan.java
index e4c8de6..6ec9dac 100644
--- a/core/src/main/java/org/apache/iceberg/BaseTableScan.java
+++ b/core/src/main/java/org/apache/iceberg/BaseTableScan.java
@@ -217,7 +217,7 @@ class BaseTableScan implements TableScan {
 
     return CloseableIterable.transform(
         CloseableIterable.wrap(splitFiles(splitSize), splits ->
-            new BinPacking.PackingIterable<>(splits, splitSize, lookback, weightFunc)),
+            new BinPacking.PackingIterable<>(splits, splitSize, lookback, weightFunc, true)),
         BaseCombinedScanTask::new);
   }
 
diff --git a/core/src/main/java/org/apache/iceberg/MergingSnapshotUpdate.java b/core/src/main/java/org/apache/iceberg/MergingSnapshotUpdate.java
index 3a4d493..af873c7 100644
--- a/core/src/main/java/org/apache/iceberg/MergingSnapshotUpdate.java
+++ b/core/src/main/java/org/apache/iceberg/MergingSnapshotUpdate.java
@@ -436,7 +436,7 @@ abstract class MergingSnapshotUpdate extends SnapshotUpdate {
     // use a lookback of 1 to avoid reordering the manifests. using 1 also means this should pack
     // from the end so that the manifest that gets under-filled is the first one, which will be
     // merged the next time.
-    ListPacker<ManifestFile> packer = new ListPacker<>(manifestTargetSizeBytes, 1);
+    ListPacker<ManifestFile> packer = new ListPacker<>(manifestTargetSizeBytes, 1, false);
     List<List<ManifestFile>> bins = packer.packEnd(group, manifest -> manifest.length());
 
     // process bins in parallel, but put results in the order of the bins into an array to preserve
diff --git a/core/src/main/java/org/apache/iceberg/util/BinPacking.java b/core/src/main/java/org/apache/iceberg/util/BinPacking.java
index d99674f..7573437 100644
--- a/core/src/main/java/org/apache/iceberg/util/BinPacking.java
+++ b/core/src/main/java/org/apache/iceberg/util/BinPacking.java
@@ -23,6 +23,8 @@ import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
+import java.util.Collections;
+import java.util.Comparator;
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
@@ -33,20 +35,22 @@ public class BinPacking {
   public static class ListPacker<T> {
     private final long targetWeight;
     private final int lookback;
+    private final boolean largestBinFirst;
 
-    public ListPacker(long targetWeight, int lookback) {
+    public ListPacker(long targetWeight, int lookback, boolean largestBinFirst) {
       this.targetWeight = targetWeight;
       this.lookback = lookback;
+      this.largestBinFirst = largestBinFirst;
     }
 
     public List<List<T>> packEnd(List<T> items, Function<T, Long> weightFunc) {
       return Lists.reverse(ImmutableList.copyOf(Iterables.transform(
-          new PackingIterable<>(Lists.reverse(items), targetWeight, lookback, weightFunc),
+          new PackingIterable<>(Lists.reverse(items), targetWeight, lookback, weightFunc, largestBinFirst),
           Lists::reverse)));
     }
 
     public List<List<T>> pack(Iterable<T> items, Function<T, Long> weightFunc) {
-      return ImmutableList.copyOf(new PackingIterable<>(items, targetWeight, lookback, weightFunc));
+      return ImmutableList.copyOf(new PackingIterable<>(items, targetWeight, lookback, weightFunc, largestBinFirst));
     }
   }
 
@@ -55,20 +59,22 @@ public class BinPacking {
     private final long targetWeight;
     private final int lookback;
     private final Function<T, Long> weightFunc;
+    private final boolean largestBinFirst;
 
     public PackingIterable(Iterable<T> iterable, long targetWeight, int lookback,
-                           Function<T, Long> weightFunc) {
+                           Function<T, Long> weightFunc, boolean largestBinFirst) {
       Preconditions.checkArgument(lookback > 0,
           "Bin look-back size must be greater than 0: %s", lookback);
       this.iterable = iterable;
       this.targetWeight = targetWeight;
       this.lookback = lookback;
       this.weightFunc = weightFunc;
+      this.largestBinFirst = largestBinFirst;
     }
 
     @Override
     public Iterator<List<T>> iterator() {
-      return new PackingIterator<>(iterable.iterator(), targetWeight, lookback, weightFunc);
+      return new PackingIterator<>(iterable.iterator(), targetWeight, lookback, weightFunc, largestBinFirst);
     }
   }
 
@@ -78,13 +84,15 @@ public class BinPacking {
     private final long targetWeight;
     private final int lookback;
     private final Function<T, Long> weightFunc;
+    private final boolean largestBinFirst;
 
     private PackingIterator(Iterator<T> items, long targetWeight, int lookback,
-                            Function<T, Long> weightFunc) {
+                            Function<T, Long> weightFunc, boolean largestBinFirst) {
       this.items = items;
       this.targetWeight = targetWeight;
       this.lookback = lookback;
       this.weightFunc = weightFunc;
+      this.largestBinFirst = largestBinFirst;
     }
 
     public boolean hasNext() {
@@ -107,7 +115,13 @@ public class BinPacking {
           bins.addLast(bin);
 
           if (bins.size() > lookback) {
-            return ImmutableList.copyOf(bins.removeFirst().items());
+            Bin binToRemove;
+            if (largestBinFirst) {
+              binToRemove = removeLargestBin(bins);
+            } else {
+              binToRemove = bins.removeFirst();
+            }
+            return ImmutableList.copyOf(binToRemove.items());
           }
         }
       }
@@ -119,6 +133,18 @@ public class BinPacking {
       return ImmutableList.copyOf(bins.removeFirst().items());
     }
 
+    private Bin removeLargestBin(LinkedList<Bin> bins) {
+      // Iterate through all bins looking for one with maximum weight, taking O(n) time.
+      Bin maxBin = Collections.max(bins, Comparator.comparingLong(Bin::weight));
+
+      // Sanity check: we have removed maxBin from list of bins.
+      if (bins.remove(maxBin)) {
+        return maxBin;
+      } else {
+        throw new NoSuchElementException();
+      }
+    }
+
     private Bin find(List<Bin> bins, long weight) {
       for (Bin bin : bins) {
         if (bin.canAdd(weight)) {
@@ -144,6 +170,10 @@ public class BinPacking {
         this.binWeight += weight;
         items.add(item);
       }
+
+      public long weight() {
+        return binWeight;
+      }
     }
   }
 }
diff --git a/core/src/test/java/org/apache/iceberg/util/TestBinPacking.java b/core/src/test/java/org/apache/iceberg/util/TestBinPacking.java
index 6877167..06a47f8 100644
--- a/core/src/test/java/org/apache/iceberg/util/TestBinPacking.java
+++ b/core/src/test/java/org/apache/iceberg/util/TestBinPacking.java
@@ -162,6 +162,15 @@ public class TestBinPacking {
     // 6. [5, 1]
     Assert.assertEquals("1 bin look-back: should merge ones with fives",
         l(l(5, 1), l(5, 1), l(5, 1)), pack(l(5, 1, 5, 1, 5, 1), 8, 1));
+
+    Assert.assertEquals("2 bin look-back: should merge until targetWeight when largestBinFirst is enabled",
+        l(l(36, 36, 36), l(128), l(36, 65), l(65)),
+        pack(l(36, 36, 36, 36, 65, 65, 128), 128, 2, true));
+
+    Assert.assertEquals(
+        "1 bin look-back: should merge until targetWeight when largestBinFirst is enabled",
+        l(l(64, 64), l(128), l(32, 32, 32, 32)),
+        pack(l(64, 64, 128, 32, 32, 32, 32), 128, 1, true));
   }
 
   private List<List<Integer>> pack(List<Integer> items, long targetWeight) {
@@ -169,7 +178,11 @@ public class TestBinPacking {
   }
 
   private List<List<Integer>> pack(List<Integer> items, long targetWeight, int lookback) {
-    ListPacker<Integer> packer = new ListPacker<>(targetWeight, lookback);
+    return pack(items, targetWeight, lookback, false);
+  }
+
+  private List<List<Integer>> pack(List<Integer> items, long targetWeight, int lookback, boolean largestBinFirst) {
+    ListPacker<Integer> packer = new ListPacker<>(targetWeight, lookback, largestBinFirst);
     return packer.pack(items, Integer::longValue);
   }
 
@@ -178,7 +191,7 @@ public class TestBinPacking {
   }
 
   private List<List<Integer>> packEnd(List<Integer> items, long targetWeight, int lookback) {
-    ListPacker<Integer> packer = new ListPacker<>(targetWeight, lookback);
+    ListPacker<Integer> packer = new ListPacker<>(targetWeight, lookback, false);
     return packer.packEnd(items, Integer::longValue);
   }