You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by "rdblue (via GitHub)" <gi...@apache.org> on 2023/05/28 23:44:29 UTC

[GitHub] [iceberg] rdblue commented on a diff in pull request #7731: Core: Implement adaptive split planning in core.

rdblue commented on code in PR #7731:
URL: https://github.com/apache/iceberg/pull/7731#discussion_r1208697877


##########
core/src/main/java/org/apache/iceberg/util/TableScanUtil.java:
##########
@@ -251,4 +322,126 @@ private static void validatePlanningArguments(long splitSize, int lookback, long
     Preconditions.checkArgument(lookback > 0, "Split planning lookback must be > 0: %s", lookback);
     Preconditions.checkArgument(openFileCost >= 0, "File open cost must be >= 0: %s", openFileCost);
   }
+
+  private static <T extends ScanTask, G extends ScanTaskGroup<T>>
+      CloseableIterable<G> planTasksInternal(
+          CloseableIterable<T> splitFiles,
+          long splitSize,
+          int lookback,
+          Function<T, Long> weightFunc,
+          Function<List<T>, G> groupFunc) {
+
+    return CloseableIterable.transform(
+        CloseableIterable.combine(
+            new BinPacking.PackingIterable<>(splitFiles, splitSize, lookback, weightFunc, true),
+            splitFiles),
+        groupFunc);
+  }
+
+  private static class AdaptiveSplitPlanningIterable<T extends ScanTask, G extends ScanTaskGroup<T>>
+      extends CloseableGroup implements CloseableIterable<G> {
+    private final CloseableIterable<T> files;
+    private final int parallelism;
+    private final long splitSize;
+    private final int lookback;
+    private final Function<T, Long> weightFunc;
+    private final BiFunction<CloseableIterable<T>, Long, CloseableIterable<T>> splitFunc;
+    private final Function<List<T>, G> groupFunc;
+
+    private Long targetSize = null;
+
+    private AdaptiveSplitPlanningIterable(
+        CloseableIterable<T> files,
+        int parallelism,
+        long splitSize,
+        int lookback,
+        Function<T, Long> weightFunc,
+        BiFunction<CloseableIterable<T>, Long, CloseableIterable<T>> splitFunc,
+        Function<List<T>, G> groupFunc) {
+      this.files = files;
+      this.parallelism = parallelism;
+      this.splitSize = splitSize;
+      this.lookback = lookback;
+      this.weightFunc = weightFunc;
+      this.splitFunc = splitFunc;
+      this.groupFunc = groupFunc;
+    }
+
+    @Override
+    public CloseableIterator<G> iterator() {
+      if (targetSize != null) {
+        // target size is already known so plan with the static target size
+        CloseableIterable<T> splitTasks = splitFunc.apply(files, targetSize);
+        CloseableIterator<G> iter =
+            planTasksInternal(splitTasks, targetSize, lookback, weightFunc, groupFunc).iterator();
+        addCloseable(iter);
+        return iter;
+      }
+
+      boolean shouldClose = true;
+      CloseableIterator<T> tasksIter = files.iterator();
+      try {
+        // load tasks until the iterator is exhausted or until the total weight is enough to get the
+        // parallelism at the split size passed in.
+        LinkedList<T> readAheadTasks = Lists.newLinkedList();
+        long readToSize = parallelism * splitSize;
+        long totalSize = 0L;
+
+        while (tasksIter.hasNext()) {
+          T task = tasksIter.next();
+          readAheadTasks.addLast(task);
+          totalSize += weightFunc.apply(task);
+
+          if (totalSize > readToSize) {
+            break;
+          }
+        }
+
+        // if total size was reached, then the requested split size is used. otherwise, the iterator
+        // was exhausted and the split size will be adjusted to target parallelism with a reasonable
+        // minimum.
+        this.targetSize = Math.max(MIN_SPLIT_SIZE, Math.min(totalSize / parallelism, splitSize));

Review Comment:
   This is where the split size is determined.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org