You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by ao...@apache.org on 2022/11/29 23:54:32 UTC
[iceberg] branch master updated: API, Core: Add groupingKey to ScanTaskGroup (#6304)
This is an automated email from the ASF dual-hosted git repository.
aokolnychyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 9f27cb333d API, Core: Add groupingKey to ScanTaskGroup (#6304)
9f27cb333d is described below
commit 9f27cb333dea397314dafc0d3e1ca7fa89b21ab3
Author: Anton Okolnychyi <ao...@apple.com>
AuthorDate: Tue Nov 29 15:54:23 2022 -0800
API, Core: Add groupingKey to ScanTaskGroup (#6304)
---
.../java/org/apache/iceberg/BaseScanTaskGroup.java | 13 +++-
.../{ScanTaskGroup.java => EmptyStructLike.java} | 50 +++++++++++-----
.../java/org/apache/iceberg/ScanTaskGroup.java | 17 ++++++
.../java/org/apache/iceberg/PartitionData.java | 4 +-
.../org/apache/iceberg/util/TableScanUtil.java | 69 ++++++++++++++++------
5 files changed, 117 insertions(+), 36 deletions(-)
diff --git a/api/src/main/java/org/apache/iceberg/BaseScanTaskGroup.java b/api/src/main/java/org/apache/iceberg/BaseScanTaskGroup.java
index 706ca344e6..4a1363023b 100644
--- a/api/src/main/java/org/apache/iceberg/BaseScanTaskGroup.java
+++ b/api/src/main/java/org/apache/iceberg/BaseScanTaskGroup.java
@@ -26,14 +26,25 @@ import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
public class BaseScanTaskGroup<T extends ScanTask> implements ScanTaskGroup<T> {
+ private final StructLike groupingKey;
private final Object[] tasks;
private transient volatile List<T> taskList;
- public BaseScanTaskGroup(Collection<T> tasks) {
+ public BaseScanTaskGroup(StructLike groupingKey, Collection<T> tasks) {
Preconditions.checkNotNull(tasks, "tasks cannot be null");
+ this.groupingKey = groupingKey;
this.tasks = tasks.toArray();
}
+ public BaseScanTaskGroup(Collection<T> tasks) {
+ this(EmptyStructLike.get(), tasks);
+ }
+
+ @Override
+ public StructLike groupingKey() {
+ return groupingKey;
+ }
+
@Override
@SuppressWarnings("unchecked")
public Collection<T> tasks() {
diff --git a/api/src/main/java/org/apache/iceberg/ScanTaskGroup.java b/api/src/main/java/org/apache/iceberg/EmptyStructLike.java
similarity index 51%
copy from api/src/main/java/org/apache/iceberg/ScanTaskGroup.java
copy to api/src/main/java/org/apache/iceberg/EmptyStructLike.java
index 791efe35e6..2d57f4c01a 100644
--- a/api/src/main/java/org/apache/iceberg/ScanTaskGroup.java
+++ b/api/src/main/java/org/apache/iceberg/EmptyStructLike.java
@@ -18,29 +18,49 @@
*/
package org.apache.iceberg;
-import java.util.Collection;
+import java.io.Serializable;
-/**
- * A scan task that may include partial input files, multiple input files or both.
- *
- * @param <T> the type of scan tasks
- */
-public interface ScanTaskGroup<T extends ScanTask> extends ScanTask {
- /** Returns scan tasks in this group. */
- Collection<T> tasks();
+class EmptyStructLike implements StructLike, Serializable {
+
+ private static final EmptyStructLike INSTANCE = new EmptyStructLike();
+
+ private EmptyStructLike() {}
+
+ static EmptyStructLike get() {
+ return INSTANCE;
+ }
+
+ @Override
+ public int size() {
+ return 0;
+ }
+
+ @Override
+ public <T> T get(int pos, Class<T> javaClass) {
+ throw new UnsupportedOperationException("Can't retrieve values from an empty struct");
+ }
@Override
- default long sizeBytes() {
- return tasks().stream().mapToLong(ScanTask::sizeBytes).sum();
+ public <T> void set(int pos, T value) {
+ throw new UnsupportedOperationException("Can't modify an empty struct");
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (this == other) {
+ return true;
+ }
+
+ return other != null && getClass() == other.getClass();
}
@Override
- default long estimatedRowsCount() {
- return tasks().stream().mapToLong(ScanTask::estimatedRowsCount).sum();
+ public int hashCode() {
+ return 0;
}
@Override
- default int filesCount() {
- return tasks().stream().mapToInt(ScanTask::filesCount).sum();
+ public String toString() {
+ return "StructLike{}";
}
}
diff --git a/api/src/main/java/org/apache/iceberg/ScanTaskGroup.java b/api/src/main/java/org/apache/iceberg/ScanTaskGroup.java
index 791efe35e6..06189083d8 100644
--- a/api/src/main/java/org/apache/iceberg/ScanTaskGroup.java
+++ b/api/src/main/java/org/apache/iceberg/ScanTaskGroup.java
@@ -26,6 +26,23 @@ import java.util.Collection;
* @param <T> the type of scan tasks
*/
public interface ScanTaskGroup<T extends ScanTask> extends ScanTask {
+ /**
+ * Returns a grouping key for this task group.
+ *
+ * <p>A grouping key is a set of values that are common amongst all rows produced by the tasks in
+ * this task group. The values may be the result of transforming the underlying data. For example,
+ * a grouping key can consist of a bucket ordinal computed by applying a bucket transform to a
+ * column of the underlying rows. The grouping key type is determined at planning time and is
+ * identical across all task groups produced by a scan.
+ *
+ * <p>Implementations should return an empty struct if the data grouping is random or unknown.
+ *
+ * @return a grouping key for this task group
+ */
+ default StructLike groupingKey() {
+ return EmptyStructLike.get();
+ }
+
/** Returns scan tasks in this group. */
Collection<T> tasks();
diff --git a/core/src/main/java/org/apache/iceberg/PartitionData.java b/core/src/main/java/org/apache/iceberg/PartitionData.java
index afee65cd9f..639fdeacd7 100644
--- a/core/src/main/java/org/apache/iceberg/PartitionData.java
+++ b/core/src/main/java/org/apache/iceberg/PartitionData.java
@@ -35,7 +35,7 @@ import org.apache.iceberg.relocated.com.google.common.hash.Hashing;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
-class PartitionData
+public class PartitionData
implements IndexedRecord, StructLike, SpecificData.SchemaConstructable, Serializable {
static Schema partitionDataSchema(Types.StructType partitionType) {
@@ -57,7 +57,7 @@ class PartitionData
this.schema = schema;
}
- PartitionData(Types.StructType partitionType) {
+ public PartitionData(Types.StructType partitionType) {
for (Types.NestedField field : partitionType.fields()) {
Preconditions.checkArgument(
field.type().isPrimitiveType(),
diff --git a/core/src/main/java/org/apache/iceberg/util/TableScanUtil.java b/core/src/main/java/org/apache/iceberg/util/TableScanUtil.java
index b54b06a3a6..5541468be4 100644
--- a/core/src/main/java/org/apache/iceberg/util/TableScanUtil.java
+++ b/core/src/main/java/org/apache/iceberg/util/TableScanUtil.java
@@ -28,11 +28,13 @@ import org.apache.iceberg.ContentFile;
import org.apache.iceberg.FileContent;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.MergeableScanTask;
+import org.apache.iceberg.PartitionData;
import org.apache.iceberg.PartitionScanTask;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.ScanTask;
import org.apache.iceberg.ScanTaskGroup;
import org.apache.iceberg.SplittableScanTask;
+import org.apache.iceberg.StructLike;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.FluentIterable;
@@ -140,7 +142,7 @@ public class TableScanUtil {
long splitSize,
int lookback,
long openFileCost,
- Types.StructType projectedPartitionType) {
+ Types.StructType groupingKeyType) {
Preconditions.checkArgument(splitSize > 0, "Invalid split size (negative or 0): %s", splitSize);
Preconditions.checkArgument(
@@ -151,38 +153,69 @@ public class TableScanUtil {
Function<T, Long> weightFunc =
task -> Math.max(task.sizeBytes(), task.filesCount() * openFileCost);
- Map<Integer, StructProjection> projectionsBySpec = Maps.newHashMap();
+ Map<Integer, StructProjection> groupingKeyProjectionsBySpec = Maps.newHashMap();
- // Group tasks by their partition values
- StructLikeMap<List<T>> tasksByPartition = StructLikeMap.create(projectedPartitionType);
+ // group tasks by grouping keys derived from their partition tuples
+ StructLikeMap<List<T>> tasksByGroupingKey = StructLikeMap.create(groupingKeyType);
for (T task : tasks) {
PartitionSpec spec = task.spec();
- StructProjection projectedStruct =
- projectionsBySpec.computeIfAbsent(
+ StructLike partition = task.partition();
+ StructProjection groupingKeyProjection =
+ groupingKeyProjectionsBySpec.computeIfAbsent(
spec.specId(),
- specId -> StructProjection.create(spec.partitionType(), projectedPartitionType));
- List<T> taskList =
- tasksByPartition.computeIfAbsent(
- projectedStruct.copyFor(task.partition()), k -> Lists.newArrayList());
+ specId -> StructProjection.create(spec.partitionType(), groupingKeyType));
+ List<T> groupingKeyTasks =
+ tasksByGroupingKey.computeIfAbsent(
+ projectGroupingKey(groupingKeyProjection, groupingKeyType, partition),
+ groupingKey -> Lists.newArrayList());
if (task instanceof SplittableScanTask<?>) {
- ((SplittableScanTask<? extends T>) task).split(splitSize).forEach(taskList::add);
+ ((SplittableScanTask<? extends T>) task).split(splitSize).forEach(groupingKeyTasks::add);
} else {
- taskList.add(task);
+ groupingKeyTasks.add(task);
}
}
- // Now apply task combining within each partition
- return FluentIterable.from(tasksByPartition.values())
- .transformAndConcat(ts -> toTaskGroupIterable(ts, splitSize, lookback, weightFunc))
- .toList();
+ List<ScanTaskGroup<T>> taskGroups = Lists.newArrayList();
+
+ for (Map.Entry<StructLike, List<T>> entry : tasksByGroupingKey.entrySet()) {
+ StructLike groupingKey = entry.getKey();
+ List<T> groupingKeyTasks = entry.getValue();
+ Iterables.addAll(
+ taskGroups,
+ toTaskGroupIterable(groupingKey, groupingKeyTasks, splitSize, lookback, weightFunc));
+ }
+
+ return taskGroups;
+ }
+
+ private static StructLike projectGroupingKey(
+ StructProjection groupingKeyProjection,
+ Types.StructType groupingKeyType,
+ StructLike partition) {
+
+ PartitionData groupingKey = new PartitionData(groupingKeyType);
+
+ groupingKeyProjection.wrap(partition);
+
+ for (int pos = 0; pos < groupingKeyProjection.size(); pos++) {
+ Class<?> javaClass = groupingKey.getType(pos).typeId().javaClass();
+ groupingKey.set(pos, groupingKeyProjection.get(pos, javaClass));
+ }
+
+ return groupingKey;
}
private static <T extends ScanTask> Iterable<ScanTaskGroup<T>> toTaskGroupIterable(
- Iterable<T> tasks, long splitSize, int lookback, Function<T, Long> weightFunc) {
+ StructLike groupingKey,
+ Iterable<T> tasks,
+ long splitSize,
+ int lookback,
+ Function<T, Long> weightFunc) {
+
return Iterables.transform(
new BinPacking.PackingIterable<>(tasks, splitSize, lookback, weightFunc, true),
- combinedTasks -> new BaseScanTaskGroup<>(mergeTasks(combinedTasks)));
+ combinedTasks -> new BaseScanTaskGroup<>(groupingKey, mergeTasks(combinedTasks)));
}
@SuppressWarnings("unchecked")