You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by ao...@apache.org on 2022/11/29 23:54:32 UTC

[iceberg] branch master updated: API, Core: Add groupingKey to ScanTaskGroup (#6304)

This is an automated email from the ASF dual-hosted git repository.

aokolnychyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 9f27cb333d API, Core: Add groupingKey to ScanTaskGroup (#6304)
9f27cb333d is described below

commit 9f27cb333dea397314dafc0d3e1ca7fa89b21ab3
Author: Anton Okolnychyi <ao...@apple.com>
AuthorDate: Tue Nov 29 15:54:23 2022 -0800

    API, Core: Add groupingKey to ScanTaskGroup (#6304)
---
 .../java/org/apache/iceberg/BaseScanTaskGroup.java | 13 +++-
 .../{ScanTaskGroup.java => EmptyStructLike.java}   | 50 +++++++++++-----
 .../java/org/apache/iceberg/ScanTaskGroup.java     | 17 ++++++
 .../java/org/apache/iceberg/PartitionData.java     |  4 +-
 .../org/apache/iceberg/util/TableScanUtil.java     | 69 ++++++++++++++++------
 5 files changed, 117 insertions(+), 36 deletions(-)

diff --git a/api/src/main/java/org/apache/iceberg/BaseScanTaskGroup.java b/api/src/main/java/org/apache/iceberg/BaseScanTaskGroup.java
index 706ca344e6..4a1363023b 100644
--- a/api/src/main/java/org/apache/iceberg/BaseScanTaskGroup.java
+++ b/api/src/main/java/org/apache/iceberg/BaseScanTaskGroup.java
@@ -26,14 +26,25 @@ import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
 
 public class BaseScanTaskGroup<T extends ScanTask> implements ScanTaskGroup<T> {
+  private final StructLike groupingKey;
   private final Object[] tasks;
   private transient volatile List<T> taskList;
 
-  public BaseScanTaskGroup(Collection<T> tasks) {
+  public BaseScanTaskGroup(StructLike groupingKey, Collection<T> tasks) {
     Preconditions.checkNotNull(tasks, "tasks cannot be null");
+    this.groupingKey = groupingKey;
     this.tasks = tasks.toArray();
   }
 
+  public BaseScanTaskGroup(Collection<T> tasks) {
+    this(EmptyStructLike.get(), tasks);
+  }
+
+  @Override
+  public StructLike groupingKey() {
+    return groupingKey;
+  }
+
   @Override
   @SuppressWarnings("unchecked")
   public Collection<T> tasks() {
diff --git a/api/src/main/java/org/apache/iceberg/ScanTaskGroup.java b/api/src/main/java/org/apache/iceberg/EmptyStructLike.java
similarity index 51%
copy from api/src/main/java/org/apache/iceberg/ScanTaskGroup.java
copy to api/src/main/java/org/apache/iceberg/EmptyStructLike.java
index 791efe35e6..2d57f4c01a 100644
--- a/api/src/main/java/org/apache/iceberg/ScanTaskGroup.java
+++ b/api/src/main/java/org/apache/iceberg/EmptyStructLike.java
@@ -18,29 +18,49 @@
  */
 package org.apache.iceberg;
 
-import java.util.Collection;
+import java.io.Serializable;
 
-/**
- * A scan task that may include partial input files, multiple input files or both.
- *
- * @param <T> the type of scan tasks
- */
-public interface ScanTaskGroup<T extends ScanTask> extends ScanTask {
-  /** Returns scan tasks in this group. */
-  Collection<T> tasks();
+class EmptyStructLike implements StructLike, Serializable {
+
+  private static final EmptyStructLike INSTANCE = new EmptyStructLike();
+
+  private EmptyStructLike() {}
+
+  static EmptyStructLike get() {
+    return INSTANCE;
+  }
+
+  @Override
+  public int size() {
+    return 0;
+  }
+
+  @Override
+  public <T> T get(int pos, Class<T> javaClass) {
+    throw new UnsupportedOperationException("Can't retrieve values from an empty struct");
+  }
 
   @Override
-  default long sizeBytes() {
-    return tasks().stream().mapToLong(ScanTask::sizeBytes).sum();
+  public <T> void set(int pos, T value) {
+    throw new UnsupportedOperationException("Can't modify an empty struct");
+  }
+
+  @Override
+  public boolean equals(Object other) {
+    if (this == other) {
+      return true;
+    }
+
+    return other != null && getClass() == other.getClass();
   }
 
   @Override
-  default long estimatedRowsCount() {
-    return tasks().stream().mapToLong(ScanTask::estimatedRowsCount).sum();
+  public int hashCode() {
+    return 0;
   }
 
   @Override
-  default int filesCount() {
-    return tasks().stream().mapToInt(ScanTask::filesCount).sum();
+  public String toString() {
+    return "StructLike{}";
   }
 }
diff --git a/api/src/main/java/org/apache/iceberg/ScanTaskGroup.java b/api/src/main/java/org/apache/iceberg/ScanTaskGroup.java
index 791efe35e6..06189083d8 100644
--- a/api/src/main/java/org/apache/iceberg/ScanTaskGroup.java
+++ b/api/src/main/java/org/apache/iceberg/ScanTaskGroup.java
@@ -26,6 +26,23 @@ import java.util.Collection;
  * @param <T> the type of scan tasks
  */
 public interface ScanTaskGroup<T extends ScanTask> extends ScanTask {
+  /**
+   * Returns a grouping key for this task group.
+   *
+   * <p>A grouping key is a set of values that are common amongst all rows produced by the tasks in
+   * this task group. The values may be the result of transforming the underlying data. For example,
+   * a grouping key can consist of a bucket ordinal computed by applying a bucket transform to a
+   * column of the underlying rows. The grouping key type is determined at planning time and is
+   * identical across all task groups produced by a scan.
+   *
+   * <p>Implementations should return an empty struct if the data grouping is random or unknown.
+   *
+   * @return a grouping key for this task group
+   */
+  default StructLike groupingKey() {
+    return EmptyStructLike.get();
+  }
+
   /** Returns scan tasks in this group. */
   Collection<T> tasks();
 
diff --git a/core/src/main/java/org/apache/iceberg/PartitionData.java b/core/src/main/java/org/apache/iceberg/PartitionData.java
index afee65cd9f..639fdeacd7 100644
--- a/core/src/main/java/org/apache/iceberg/PartitionData.java
+++ b/core/src/main/java/org/apache/iceberg/PartitionData.java
@@ -35,7 +35,7 @@ import org.apache.iceberg.relocated.com.google.common.hash.Hashing;
 import org.apache.iceberg.types.Type;
 import org.apache.iceberg.types.Types;
 
-class PartitionData
+public class PartitionData
     implements IndexedRecord, StructLike, SpecificData.SchemaConstructable, Serializable {
 
   static Schema partitionDataSchema(Types.StructType partitionType) {
@@ -57,7 +57,7 @@ class PartitionData
     this.schema = schema;
   }
 
-  PartitionData(Types.StructType partitionType) {
+  public PartitionData(Types.StructType partitionType) {
     for (Types.NestedField field : partitionType.fields()) {
       Preconditions.checkArgument(
           field.type().isPrimitiveType(),
diff --git a/core/src/main/java/org/apache/iceberg/util/TableScanUtil.java b/core/src/main/java/org/apache/iceberg/util/TableScanUtil.java
index b54b06a3a6..5541468be4 100644
--- a/core/src/main/java/org/apache/iceberg/util/TableScanUtil.java
+++ b/core/src/main/java/org/apache/iceberg/util/TableScanUtil.java
@@ -28,11 +28,13 @@ import org.apache.iceberg.ContentFile;
 import org.apache.iceberg.FileContent;
 import org.apache.iceberg.FileScanTask;
 import org.apache.iceberg.MergeableScanTask;
+import org.apache.iceberg.PartitionData;
 import org.apache.iceberg.PartitionScanTask;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.ScanTask;
 import org.apache.iceberg.ScanTaskGroup;
 import org.apache.iceberg.SplittableScanTask;
+import org.apache.iceberg.StructLike;
 import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.collect.FluentIterable;
@@ -140,7 +142,7 @@ public class TableScanUtil {
       long splitSize,
       int lookback,
       long openFileCost,
-      Types.StructType projectedPartitionType) {
+      Types.StructType groupingKeyType) {
 
     Preconditions.checkArgument(splitSize > 0, "Invalid split size (negative or 0): %s", splitSize);
     Preconditions.checkArgument(
@@ -151,38 +153,69 @@ public class TableScanUtil {
     Function<T, Long> weightFunc =
         task -> Math.max(task.sizeBytes(), task.filesCount() * openFileCost);
 
-    Map<Integer, StructProjection> projectionsBySpec = Maps.newHashMap();
+    Map<Integer, StructProjection> groupingKeyProjectionsBySpec = Maps.newHashMap();
 
-    // Group tasks by their partition values
-    StructLikeMap<List<T>> tasksByPartition = StructLikeMap.create(projectedPartitionType);
+    // group tasks by grouping keys derived from their partition tuples
+    StructLikeMap<List<T>> tasksByGroupingKey = StructLikeMap.create(groupingKeyType);
 
     for (T task : tasks) {
       PartitionSpec spec = task.spec();
-      StructProjection projectedStruct =
-          projectionsBySpec.computeIfAbsent(
+      StructLike partition = task.partition();
+      StructProjection groupingKeyProjection =
+          groupingKeyProjectionsBySpec.computeIfAbsent(
               spec.specId(),
-              specId -> StructProjection.create(spec.partitionType(), projectedPartitionType));
-      List<T> taskList =
-          tasksByPartition.computeIfAbsent(
-              projectedStruct.copyFor(task.partition()), k -> Lists.newArrayList());
+              specId -> StructProjection.create(spec.partitionType(), groupingKeyType));
+      List<T> groupingKeyTasks =
+          tasksByGroupingKey.computeIfAbsent(
+              projectGroupingKey(groupingKeyProjection, groupingKeyType, partition),
+              groupingKey -> Lists.newArrayList());
       if (task instanceof SplittableScanTask<?>) {
-        ((SplittableScanTask<? extends T>) task).split(splitSize).forEach(taskList::add);
+        ((SplittableScanTask<? extends T>) task).split(splitSize).forEach(groupingKeyTasks::add);
       } else {
-        taskList.add(task);
+        groupingKeyTasks.add(task);
       }
     }
 
-    // Now apply task combining within each partition
-    return FluentIterable.from(tasksByPartition.values())
-        .transformAndConcat(ts -> toTaskGroupIterable(ts, splitSize, lookback, weightFunc))
-        .toList();
+    List<ScanTaskGroup<T>> taskGroups = Lists.newArrayList();
+
+    for (Map.Entry<StructLike, List<T>> entry : tasksByGroupingKey.entrySet()) {
+      StructLike groupingKey = entry.getKey();
+      List<T> groupingKeyTasks = entry.getValue();
+      Iterables.addAll(
+          taskGroups,
+          toTaskGroupIterable(groupingKey, groupingKeyTasks, splitSize, lookback, weightFunc));
+    }
+
+    return taskGroups;
+  }
+
+  private static StructLike projectGroupingKey(
+      StructProjection groupingKeyProjection,
+      Types.StructType groupingKeyType,
+      StructLike partition) {
+
+    PartitionData groupingKey = new PartitionData(groupingKeyType);
+
+    groupingKeyProjection.wrap(partition);
+
+    for (int pos = 0; pos < groupingKeyProjection.size(); pos++) {
+      Class<?> javaClass = groupingKey.getType(pos).typeId().javaClass();
+      groupingKey.set(pos, groupingKeyProjection.get(pos, javaClass));
+    }
+
+    return groupingKey;
   }
 
   private static <T extends ScanTask> Iterable<ScanTaskGroup<T>> toTaskGroupIterable(
-      Iterable<T> tasks, long splitSize, int lookback, Function<T, Long> weightFunc) {
+      StructLike groupingKey,
+      Iterable<T> tasks,
+      long splitSize,
+      int lookback,
+      Function<T, Long> weightFunc) {
+
     return Iterables.transform(
         new BinPacking.PackingIterable<>(tasks, splitSize, lookback, weightFunc, true),
-        combinedTasks -> new BaseScanTaskGroup<>(mergeTasks(combinedTasks)));
+        combinedTasks -> new BaseScanTaskGroup<>(groupingKey, mergeTasks(combinedTasks)));
   }
 
   @SuppressWarnings("unchecked")