You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2022/11/28 22:33:11 UTC

[GitHub] [iceberg] aokolnychyi opened a new pull request, #6304: API, Core: Add groupingKey to ScanTaskGroup

aokolnychyi opened a new pull request, #6304:
URL: https://github.com/apache/iceberg/pull/6304

   This PR adds `groupingKey` to `ScanTaskGroup` so that we can later report it to query engines.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #6304: API, Core: Add groupingKey to ScanTaskGroup

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on code in PR #6304:
URL: https://github.com/apache/iceberg/pull/6304#discussion_r1034126627


##########
api/src/main/java/org/apache/iceberg/ScanTaskGroup.java:
##########
@@ -26,6 +26,24 @@
  * @param <T> the type of scan tasks
  */
 public interface ScanTaskGroup<T extends ScanTask> extends ScanTask {
+  /**
+   * Returns a grouping key for this task group.
+   *
+   * <p>Tasks may produce records that belong to a specific group of data (e.g. partition). It may
+   * be beneficial to preserve that grouping while combining tasks into groups if query engines can
+   * leverage that information to avoid shuffles. Grouping keys indicate how data is split between
+   * different task groups. The grouping key type is determined at planning time and is identical
+   * across all task groups produced by a scan.
+   *
+   * <p>All records produced by tasks in this group are guaranteed to have the reported grouping
+   * key. Implementations should return an empty struct if the data grouping is random or unknown.
+   *
+   * @return a grouping key for this task group
+   */
+  default StructLike groupingKey() {

Review Comment:
   I also considered calling it `partitionKey`. However, I went for `groupingKey`:
   - It is more generic. We may expose grouping on non-partition attributes in the future.
   - It matches `Partitioning$groupingKeyType`, which is inspired by `KeyGroupedPartitioning` in Spark.
   - It avoids confusion between `partitionKey` and `partition` (reported by `PartitionScanTask`).
   - Calling it `partitionKey` may be misleading as only non-void transforms participate in grouping.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #6304: API, Core: Add groupingKey to ScanTaskGroup

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on code in PR #6304:
URL: https://github.com/apache/iceberg/pull/6304#discussion_r1035293150


##########
api/src/main/java/org/apache/iceberg/ScanTaskGroup.java:
##########
@@ -26,6 +26,24 @@
  * @param <T> the type of scan tasks
  */
 public interface ScanTaskGroup<T extends ScanTask> extends ScanTask {
+  /**
+   * Returns a grouping key for this task group.
+   *
+   * <p>Tasks may produce records that belong to a specific group of data (e.g. partition). It may

Review Comment:
   I agree. Changed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #6304: API, Core: Add groupingKey to ScanTaskGroup

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on code in PR #6304:
URL: https://github.com/apache/iceberg/pull/6304#discussion_r1034133132


##########
core/src/main/java/org/apache/iceberg/util/TableScanUtil.java:
##########
@@ -151,38 +153,68 @@ public static <T extends PartitionScanTask> List<ScanTaskGroup<T>> planTaskGroup
     Function<T, Long> weightFunc =
         task -> Math.max(task.sizeBytes(), task.filesCount() * openFileCost);
 
-    Map<Integer, StructProjection> projectionsBySpec = Maps.newHashMap();
+    Map<Integer, StructProjection> groupingKeyProjectionsBySpec = Maps.newHashMap();
 
-    // Group tasks by their partition values
-    StructLikeMap<List<T>> tasksByPartition = StructLikeMap.create(projectedPartitionType);
+    // group tasks by grouping keys derived from their partition tuples
+    StructLikeMap<List<T>> tasksByGroupingKey = StructLikeMap.create(groupingKeyType);
 
     for (T task : tasks) {
       PartitionSpec spec = task.spec();
-      StructProjection projectedStruct =
-          projectionsBySpec.computeIfAbsent(
+      StructProjection groupingKeyProjection =
+          groupingKeyProjectionsBySpec.computeIfAbsent(
               spec.specId(),
-              specId -> StructProjection.create(spec.partitionType(), projectedPartitionType));
-      List<T> taskList =
-          tasksByPartition.computeIfAbsent(
-              projectedStruct.copyFor(task.partition()), k -> Lists.newArrayList());

Review Comment:
   We were using `StructProjection` that lazily projects values. However, it is not serializable. That's why I can't pass it into task groups. Instead of creating a copy of the projection, I eagerly project values into `PartitionData`.
   
   See `projectGroupingKey` below.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on pull request #6304: API, Core: Add groupingKey to ScanTaskGroup

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on PR #6304:
URL: https://github.com/apache/iceberg/pull/6304#issuecomment-1329863109

   cc @RussellSpitzer @szehon-ho @flyrain @stevenzwu @rdblue


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a diff in pull request #6304: API, Core: Add groupingKey to ScanTaskGroup

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on code in PR #6304:
URL: https://github.com/apache/iceberg/pull/6304#discussion_r1035356882


##########
core/src/main/java/org/apache/iceberg/util/TableScanUtil.java:
##########
@@ -151,38 +153,68 @@ public static <T extends PartitionScanTask> List<ScanTaskGroup<T>> planTaskGroup
     Function<T, Long> weightFunc =
         task -> Math.max(task.sizeBytes(), task.filesCount() * openFileCost);
 
-    Map<Integer, StructProjection> projectionsBySpec = Maps.newHashMap();
+    Map<Integer, StructProjection> groupingKeyProjectionsBySpec = Maps.newHashMap();
 
-    // Group tasks by their partition values
-    StructLikeMap<List<T>> tasksByPartition = StructLikeMap.create(projectedPartitionType);
+    // group tasks by grouping keys derived from their partition tuples
+    StructLikeMap<List<T>> tasksByGroupingKey = StructLikeMap.create(groupingKeyType);
 
     for (T task : tasks) {
       PartitionSpec spec = task.spec();
-      StructProjection projectedStruct =
-          projectionsBySpec.computeIfAbsent(
+      StructProjection groupingKeyProjection =
+          groupingKeyProjectionsBySpec.computeIfAbsent(
               spec.specId(),
-              specId -> StructProjection.create(spec.partitionType(), projectedPartitionType));
-      List<T> taskList =
-          tasksByPartition.computeIfAbsent(
-              projectedStruct.copyFor(task.partition()), k -> Lists.newArrayList());
+              specId -> StructProjection.create(spec.partitionType(), groupingKeyType));
+      List<T> groupingKeyTasks =
+          tasksByGroupingKey.computeIfAbsent(
+              projectGroupingKey(groupingKeyProjection, groupingKeyType, task),
+              groupingKey -> Lists.newArrayList());
       if (task instanceof SplittableScanTask<?>) {
-        ((SplittableScanTask<? extends T>) task).split(splitSize).forEach(taskList::add);
+        ((SplittableScanTask<? extends T>) task).split(splitSize).forEach(groupingKeyTasks::add);
       } else {
-        taskList.add(task);
+        groupingKeyTasks.add(task);
       }
     }
 
-    // Now apply task combining within each partition
-    return FluentIterable.from(tasksByPartition.values())
-        .transformAndConcat(ts -> toTaskGroupIterable(ts, splitSize, lookback, weightFunc))
-        .toList();
+    List<ScanTaskGroup<T>> taskGroups = Lists.newArrayList();
+
+    for (Map.Entry<StructLike, List<T>> entry : tasksByGroupingKey.entrySet()) {
+      StructLike groupingKey = entry.getKey();
+      List<T> groupingKeyTasks = entry.getValue();
+      Iterables.addAll(
+          taskGroups,
+          toTaskGroupIterable(groupingKey, groupingKeyTasks, splitSize, lookback, weightFunc));
+    }
+
+    return taskGroups;
+  }
+
+  private static StructLike projectGroupingKey(
+      StructProjection groupingKeyProjection,
+      Types.StructType groupingKeyType,
+      PartitionScanTask task) {
+
+    PartitionData groupingKey = new PartitionData(groupingKeyType);
+
+    groupingKeyProjection.wrap(task.partition());

Review Comment:
   yeah. this is real minor comment. please go ahead with merging.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #6304: API, Core: Add groupingKey to ScanTaskGroup

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on code in PR #6304:
URL: https://github.com/apache/iceberg/pull/6304#discussion_r1034126627


##########
api/src/main/java/org/apache/iceberg/ScanTaskGroup.java:
##########
@@ -26,6 +26,24 @@
  * @param <T> the type of scan tasks
  */
 public interface ScanTaskGroup<T extends ScanTask> extends ScanTask {
+  /**
+   * Returns a grouping key for this task group.
+   *
+   * <p>Tasks may produce records that belong to a specific group of data (e.g. partition). It may
+   * be beneficial to preserve that grouping while combining tasks into groups if query engines can
+   * leverage that information to avoid shuffles. Grouping keys indicate how data is split between
+   * different task groups. The grouping key type is determined at planning time and is identical
+   * across all task groups produced by a scan.
+   *
+   * <p>All records produced by tasks in this group are guaranteed to have the reported grouping
+   * key. Implementations should return an empty struct if the data grouping is random or unknown.
+   *
+   * @return a grouping key for this task group
+   */
+  default StructLike groupingKey() {

Review Comment:
   I also considered calling it `partitionKey`. However, I went for `groupingKey`:
   - It is more generic. We may expose grouping on non-partition attributes in the future.
   - It matches `Partitioning$groupingKeyType`, which is inspired by `KeyGroupedPartitioning` in Spark.
   - It avoids confusion between `partitionKey` and `partition` (reported by `PartitionScanTask`).
   - Calling it `partitionKey` may be misleading as we consider only non-void transforms.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #6304: API, Core: Add groupingKey to ScanTaskGroup

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on code in PR #6304:
URL: https://github.com/apache/iceberg/pull/6304#discussion_r1035295892


##########
core/src/main/java/org/apache/iceberg/util/TableScanUtil.java:
##########
@@ -151,38 +153,68 @@ public static <T extends PartitionScanTask> List<ScanTaskGroup<T>> planTaskGroup
     Function<T, Long> weightFunc =
         task -> Math.max(task.sizeBytes(), task.filesCount() * openFileCost);
 
-    Map<Integer, StructProjection> projectionsBySpec = Maps.newHashMap();
+    Map<Integer, StructProjection> groupingKeyProjectionsBySpec = Maps.newHashMap();
 
-    // Group tasks by their partition values
-    StructLikeMap<List<T>> tasksByPartition = StructLikeMap.create(projectedPartitionType);
+    // group tasks by grouping keys derived from their partition tuples
+    StructLikeMap<List<T>> tasksByGroupingKey = StructLikeMap.create(groupingKeyType);
 
     for (T task : tasks) {
       PartitionSpec spec = task.spec();
-      StructProjection projectedStruct =
-          projectionsBySpec.computeIfAbsent(
+      StructProjection groupingKeyProjection =
+          groupingKeyProjectionsBySpec.computeIfAbsent(
               spec.specId(),
-              specId -> StructProjection.create(spec.partitionType(), projectedPartitionType));
-      List<T> taskList =
-          tasksByPartition.computeIfAbsent(
-              projectedStruct.copyFor(task.partition()), k -> Lists.newArrayList());
+              specId -> StructProjection.create(spec.partitionType(), groupingKeyType));
+      List<T> groupingKeyTasks =
+          tasksByGroupingKey.computeIfAbsent(
+              projectGroupingKey(groupingKeyProjection, groupingKeyType, task),
+              groupingKey -> Lists.newArrayList());
       if (task instanceof SplittableScanTask<?>) {
-        ((SplittableScanTask<? extends T>) task).split(splitSize).forEach(taskList::add);
+        ((SplittableScanTask<? extends T>) task).split(splitSize).forEach(groupingKeyTasks::add);
       } else {
-        taskList.add(task);
+        groupingKeyTasks.add(task);
       }
     }
 
-    // Now apply task combining within each partition
-    return FluentIterable.from(tasksByPartition.values())
-        .transformAndConcat(ts -> toTaskGroupIterable(ts, splitSize, lookback, weightFunc))
-        .toList();
+    List<ScanTaskGroup<T>> taskGroups = Lists.newArrayList();
+
+    for (Map.Entry<StructLike, List<T>> entry : tasksByGroupingKey.entrySet()) {
+      StructLike groupingKey = entry.getKey();
+      List<T> groupingKeyTasks = entry.getValue();
+      Iterables.addAll(
+          taskGroups,
+          toTaskGroupIterable(groupingKey, groupingKeyTasks, splitSize, lookback, weightFunc));
+    }
+
+    return taskGroups;
+  }
+
+  private static StructLike projectGroupingKey(
+      StructProjection groupingKeyProjection,
+      Types.StructType groupingKeyType,
+      PartitionScanTask task) {
+
+    PartitionData groupingKey = new PartitionData(groupingKeyType);
+
+    groupingKeyProjection.wrap(task.partition());

Review Comment:
   I tried but then the helper method started to look a bit weird, so I'd have to play around with the naming more. I agree no need to pass the task. I changed the method to accept a partition. It seems reasonable now to have a method that would project a grouping key from the partition tuple.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #6304: API, Core: Add groupingKey to ScanTaskGroup

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on code in PR #6304:
URL: https://github.com/apache/iceberg/pull/6304#discussion_r1035346108


##########
core/src/main/java/org/apache/iceberg/util/TableScanUtil.java:
##########
@@ -151,38 +153,68 @@ public static <T extends PartitionScanTask> List<ScanTaskGroup<T>> planTaskGroup
     Function<T, Long> weightFunc =
         task -> Math.max(task.sizeBytes(), task.filesCount() * openFileCost);
 
-    Map<Integer, StructProjection> projectionsBySpec = Maps.newHashMap();
+    Map<Integer, StructProjection> groupingKeyProjectionsBySpec = Maps.newHashMap();
 
-    // Group tasks by their partition values
-    StructLikeMap<List<T>> tasksByPartition = StructLikeMap.create(projectedPartitionType);
+    // group tasks by grouping keys derived from their partition tuples
+    StructLikeMap<List<T>> tasksByGroupingKey = StructLikeMap.create(groupingKeyType);
 
     for (T task : tasks) {
       PartitionSpec spec = task.spec();
-      StructProjection projectedStruct =
-          projectionsBySpec.computeIfAbsent(
+      StructProjection groupingKeyProjection =
+          groupingKeyProjectionsBySpec.computeIfAbsent(
               spec.specId(),
-              specId -> StructProjection.create(spec.partitionType(), projectedPartitionType));
-      List<T> taskList =
-          tasksByPartition.computeIfAbsent(
-              projectedStruct.copyFor(task.partition()), k -> Lists.newArrayList());
+              specId -> StructProjection.create(spec.partitionType(), groupingKeyType));
+      List<T> groupingKeyTasks =
+          tasksByGroupingKey.computeIfAbsent(
+              projectGroupingKey(groupingKeyProjection, groupingKeyType, task),
+              groupingKey -> Lists.newArrayList());
       if (task instanceof SplittableScanTask<?>) {
-        ((SplittableScanTask<? extends T>) task).split(splitSize).forEach(taskList::add);
+        ((SplittableScanTask<? extends T>) task).split(splitSize).forEach(groupingKeyTasks::add);
       } else {
-        taskList.add(task);
+        groupingKeyTasks.add(task);
       }
     }
 
-    // Now apply task combining within each partition
-    return FluentIterable.from(tasksByPartition.values())
-        .transformAndConcat(ts -> toTaskGroupIterable(ts, splitSize, lookback, weightFunc))
-        .toList();
+    List<ScanTaskGroup<T>> taskGroups = Lists.newArrayList();
+
+    for (Map.Entry<StructLike, List<T>> entry : tasksByGroupingKey.entrySet()) {
+      StructLike groupingKey = entry.getKey();
+      List<T> groupingKeyTasks = entry.getValue();
+      Iterables.addAll(
+          taskGroups,
+          toTaskGroupIterable(groupingKey, groupingKeyTasks, splitSize, lookback, weightFunc));
+    }
+
+    return taskGroups;
+  }
+
+  private static StructLike projectGroupingKey(
+      StructProjection groupingKeyProjection,
+      Types.StructType groupingKeyType,
+      PartitionScanTask task) {
+
+    PartitionData groupingKey = new PartitionData(groupingKeyType);
+
+    groupingKeyProjection.wrap(task.partition());

Review Comment:
   Correct, but this would mean I no longer project the key in that method, so I'll have to rename it to something like `copyAsPartitionData`. That would make the method disconnected from the fact that we project this key from the partition, so using `PartitionData` for the key would be weird. On top, Spotless formatted the new code in a less readable way, which I can't change.
   
   I see your point but given that I can't change Spotless formatting, do you mind if I keep it this way?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on pull request #6304: API, Core: Add groupingKey to ScanTaskGroup

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on PR #6304:
URL: https://github.com/apache/iceberg/pull/6304#issuecomment-1331469441

   Thanks a lot for reviewing, @stevenzwu @nastra @flyrain @RussellSpitzer!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #6304: API, Core: Add groupingKey to ScanTaskGroup

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on code in PR #6304:
URL: https://github.com/apache/iceberg/pull/6304#discussion_r1034128298


##########
core/src/main/java/org/apache/iceberg/PartitionData.java:
##########
@@ -35,7 +35,7 @@
 import org.apache.iceberg.types.Type;
 import org.apache.iceberg.types.Types;
 
-class PartitionData
+public class PartitionData

Review Comment:
   I had to make this public as `TableScanUtil` is in another package and I needed a serializable holder for partition data.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a diff in pull request #6304: API, Core: Add groupingKey to ScanTaskGroup

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on code in PR #6304:
URL: https://github.com/apache/iceberg/pull/6304#discussion_r1034129295


##########
api/src/main/java/org/apache/iceberg/ScanTaskGroup.java:
##########
@@ -26,6 +26,24 @@
  * @param <T> the type of scan tasks
  */
 public interface ScanTaskGroup<T extends ScanTask> extends ScanTask {
+  /**
+   * Returns a grouping key for this task group.
+   *
+   * <p>Tasks may produce records that belong to a specific group of data (e.g. partition). It may
+   * be beneficial to preserve that grouping while combining tasks into groups if query engines can
+   * leverage that information to avoid shuffles. Grouping keys indicate how data is split between
+   * different task groups. The grouping key type is determined at planning time and is identical
+   * across all task groups produced by a scan.
+   *
+   * <p>All records produced by tasks in this group are guaranteed to have the reported grouping
+   * key. Implementations should return an empty struct if the data grouping is random or unknown.
+   *
+   * @return a grouping key for this task group
+   */
+  default StructLike groupingKey() {

Review Comment:
   +1. matches better with the interface name of `ScanTaskGroup`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a diff in pull request #6304: API, Core: Add groupingKey to ScanTaskGroup

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on code in PR #6304:
URL: https://github.com/apache/iceberg/pull/6304#discussion_r1034136750


##########
core/src/main/java/org/apache/iceberg/util/TableScanUtil.java:
##########
@@ -151,38 +153,68 @@ public static <T extends PartitionScanTask> List<ScanTaskGroup<T>> planTaskGroup
     Function<T, Long> weightFunc =
         task -> Math.max(task.sizeBytes(), task.filesCount() * openFileCost);
 
-    Map<Integer, StructProjection> projectionsBySpec = Maps.newHashMap();
+    Map<Integer, StructProjection> groupingKeyProjectionsBySpec = Maps.newHashMap();
 
-    // Group tasks by their partition values
-    StructLikeMap<List<T>> tasksByPartition = StructLikeMap.create(projectedPartitionType);
+    // group tasks by grouping keys derived from their partition tuples
+    StructLikeMap<List<T>> tasksByGroupingKey = StructLikeMap.create(groupingKeyType);
 
     for (T task : tasks) {
       PartitionSpec spec = task.spec();
-      StructProjection projectedStruct =
-          projectionsBySpec.computeIfAbsent(
+      StructProjection groupingKeyProjection =
+          groupingKeyProjectionsBySpec.computeIfAbsent(
               spec.specId(),
-              specId -> StructProjection.create(spec.partitionType(), projectedPartitionType));
-      List<T> taskList =
-          tasksByPartition.computeIfAbsent(
-              projectedStruct.copyFor(task.partition()), k -> Lists.newArrayList());
+              specId -> StructProjection.create(spec.partitionType(), groupingKeyType));
+      List<T> groupingKeyTasks =
+          tasksByGroupingKey.computeIfAbsent(
+              projectGroupingKey(groupingKeyProjection, groupingKeyType, task),
+              groupingKey -> Lists.newArrayList());
       if (task instanceof SplittableScanTask<?>) {
-        ((SplittableScanTask<? extends T>) task).split(splitSize).forEach(taskList::add);
+        ((SplittableScanTask<? extends T>) task).split(splitSize).forEach(groupingKeyTasks::add);
       } else {
-        taskList.add(task);
+        groupingKeyTasks.add(task);
       }
     }
 
-    // Now apply task combining within each partition
-    return FluentIterable.from(tasksByPartition.values())
-        .transformAndConcat(ts -> toTaskGroupIterable(ts, splitSize, lookback, weightFunc))
-        .toList();
+    List<ScanTaskGroup<T>> taskGroups = Lists.newArrayList();
+
+    for (Map.Entry<StructLike, List<T>> entry : tasksByGroupingKey.entrySet()) {
+      StructLike groupingKey = entry.getKey();
+      List<T> groupingKeyTasks = entry.getValue();
+      Iterables.addAll(
+          taskGroups,
+          toTaskGroupIterable(groupingKey, groupingKeyTasks, splitSize, lookback, weightFunc));
+    }
+
+    return taskGroups;
+  }
+
+  private static StructLike projectGroupingKey(
+      StructProjection groupingKeyProjection,
+      Types.StructType groupingKeyType,
+      PartitionScanTask task) {
+
+    PartitionData groupingKey = new PartitionData(groupingKeyType);
+
+    groupingKeyProjection.wrap(task.partition());

Review Comment:
   nit: it seems more natural to complete the `groupingKeyProjection` construction outside this method.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #6304: API, Core: Add groupingKey to ScanTaskGroup

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on code in PR #6304:
URL: https://github.com/apache/iceberg/pull/6304#discussion_r1034135458


##########
core/src/main/java/org/apache/iceberg/util/TableScanUtil.java:
##########
@@ -151,38 +153,68 @@ public static <T extends PartitionScanTask> List<ScanTaskGroup<T>> planTaskGroup
     Function<T, Long> weightFunc =
         task -> Math.max(task.sizeBytes(), task.filesCount() * openFileCost);
 
-    Map<Integer, StructProjection> projectionsBySpec = Maps.newHashMap();
+    Map<Integer, StructProjection> groupingKeyProjectionsBySpec = Maps.newHashMap();
 
-    // Group tasks by their partition values
-    StructLikeMap<List<T>> tasksByPartition = StructLikeMap.create(projectedPartitionType);
+    // group tasks by grouping keys derived from their partition tuples
+    StructLikeMap<List<T>> tasksByGroupingKey = StructLikeMap.create(groupingKeyType);
 
     for (T task : tasks) {
       PartitionSpec spec = task.spec();
-      StructProjection projectedStruct =
-          projectionsBySpec.computeIfAbsent(
+      StructProjection groupingKeyProjection =
+          groupingKeyProjectionsBySpec.computeIfAbsent(
               spec.specId(),
-              specId -> StructProjection.create(spec.partitionType(), projectedPartitionType));
-      List<T> taskList =
-          tasksByPartition.computeIfAbsent(
-              projectedStruct.copyFor(task.partition()), k -> Lists.newArrayList());
+              specId -> StructProjection.create(spec.partitionType(), groupingKeyType));
+      List<T> groupingKeyTasks =
+          tasksByGroupingKey.computeIfAbsent(
+              projectGroupingKey(groupingKeyProjection, groupingKeyType, task),
+              groupingKey -> Lists.newArrayList());
       if (task instanceof SplittableScanTask<?>) {
-        ((SplittableScanTask<? extends T>) task).split(splitSize).forEach(taskList::add);
+        ((SplittableScanTask<? extends T>) task).split(splitSize).forEach(groupingKeyTasks::add);
       } else {
-        taskList.add(task);
+        groupingKeyTasks.add(task);
       }
     }
 
-    // Now apply task combining within each partition
-    return FluentIterable.from(tasksByPartition.values())

Review Comment:
   I need to pass more params around so `FluentIterable` no longer gives concise syntax.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #6304: API, Core: Add groupingKey to ScanTaskGroup

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on code in PR #6304:
URL: https://github.com/apache/iceberg/pull/6304#discussion_r1034121591


##########
api/src/main/java/org/apache/iceberg/EmptyStructLike.java:
##########
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import java.io.Serializable;
+
+class EmptyStructLike implements StructLike, Serializable {
+
+  private static final EmptyStructLike INSTANCE = new EmptyStructLike();
+
+  private EmptyStructLike() {}
+
+  static EmptyStructLike get() {
+    return INSTANCE;
+  }
+
+  @Override
+  public int size() {
+    return 0;
+  }
+
+  @Override
+  public <T> T get(int pos, Class<T> javaClass) {
+    throw new UnsupportedOperationException("Can't retrieve values from an empty struct");
+  }
+
+  @Override
+  public <T> void set(int pos, T value) {
+    throw new UnsupportedOperationException("Can't modify an empty struct");
+  }
+
+  @Override
+  public boolean equals(Object other) {

Review Comment:
   I implemented `equals` and `hashCode` to avoid surprises with equality after serialization/deserialization.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #6304: API, Core: Add groupingKey to ScanTaskGroup

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on code in PR #6304:
URL: https://github.com/apache/iceberg/pull/6304#discussion_r1035293785


##########
api/src/main/java/org/apache/iceberg/ScanTaskGroup.java:
##########
@@ -26,6 +26,24 @@
  * @param <T> the type of scan tasks
  */
 public interface ScanTaskGroup<T extends ScanTask> extends ScanTask {
+  /**
+   * Returns a grouping key for this task group.
+   *
+   * <p>Tasks may produce records that belong to a specific group of data (e.g. partition). It may
+   * be beneficial to preserve that grouping while combining tasks into groups if query engines can
+   * leverage that information to avoid shuffles. Grouping keys indicate how data is split between

Review Comment:
   Reworked the doc.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a diff in pull request #6304: API, Core: Add groupingKey to ScanTaskGroup

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on code in PR #6304:
URL: https://github.com/apache/iceberg/pull/6304#discussion_r1034136750


##########
core/src/main/java/org/apache/iceberg/util/TableScanUtil.java:
##########
@@ -151,38 +153,68 @@ public static <T extends PartitionScanTask> List<ScanTaskGroup<T>> planTaskGroup
     Function<T, Long> weightFunc =
         task -> Math.max(task.sizeBytes(), task.filesCount() * openFileCost);
 
-    Map<Integer, StructProjection> projectionsBySpec = Maps.newHashMap();
+    Map<Integer, StructProjection> groupingKeyProjectionsBySpec = Maps.newHashMap();
 
-    // Group tasks by their partition values
-    StructLikeMap<List<T>> tasksByPartition = StructLikeMap.create(projectedPartitionType);
+    // group tasks by grouping keys derived from their partition tuples
+    StructLikeMap<List<T>> tasksByGroupingKey = StructLikeMap.create(groupingKeyType);
 
     for (T task : tasks) {
       PartitionSpec spec = task.spec();
-      StructProjection projectedStruct =
-          projectionsBySpec.computeIfAbsent(
+      StructProjection groupingKeyProjection =
+          groupingKeyProjectionsBySpec.computeIfAbsent(
               spec.specId(),
-              specId -> StructProjection.create(spec.partitionType(), projectedPartitionType));
-      List<T> taskList =
-          tasksByPartition.computeIfAbsent(
-              projectedStruct.copyFor(task.partition()), k -> Lists.newArrayList());
+              specId -> StructProjection.create(spec.partitionType(), groupingKeyType));
+      List<T> groupingKeyTasks =
+          tasksByGroupingKey.computeIfAbsent(
+              projectGroupingKey(groupingKeyProjection, groupingKeyType, task),
+              groupingKey -> Lists.newArrayList());
       if (task instanceof SplittableScanTask<?>) {
-        ((SplittableScanTask<? extends T>) task).split(splitSize).forEach(taskList::add);
+        ((SplittableScanTask<? extends T>) task).split(splitSize).forEach(groupingKeyTasks::add);
       } else {
-        taskList.add(task);
+        groupingKeyTasks.add(task);
       }
     }
 
-    // Now apply task combining within each partition
-    return FluentIterable.from(tasksByPartition.values())
-        .transformAndConcat(ts -> toTaskGroupIterable(ts, splitSize, lookback, weightFunc))
-        .toList();
+    List<ScanTaskGroup<T>> taskGroups = Lists.newArrayList();
+
+    for (Map.Entry<StructLike, List<T>> entry : tasksByGroupingKey.entrySet()) {
+      StructLike groupingKey = entry.getKey();
+      List<T> groupingKeyTasks = entry.getValue();
+      Iterables.addAll(
+          taskGroups,
+          toTaskGroupIterable(groupingKey, groupingKeyTasks, splitSize, lookback, weightFunc));
+    }
+
+    return taskGroups;
+  }
+
+  private static StructLike projectGroupingKey(
+      StructProjection groupingKeyProjection,
+      Types.StructType groupingKeyType,
+      PartitionScanTask task) {
+
+    PartitionData groupingKey = new PartitionData(groupingKeyType);
+
+    groupingKeyProjection.wrap(task.partition());

Review Comment:
   nit: it seems more natural to complete the `groupingKeyProjection` construction outside this method and avoid the need of passing in `PartitionScanTask`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #6304: API, Core: Add groupingKey to ScanTaskGroup

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on code in PR #6304:
URL: https://github.com/apache/iceberg/pull/6304#discussion_r1034131101


##########
core/src/main/java/org/apache/iceberg/util/TableScanUtil.java:
##########
@@ -140,7 +142,7 @@ public static <T extends PartitionScanTask> List<ScanTaskGroup<T>> planTaskGroup
       long splitSize,
       int lookback,
       long openFileCost,
-      Types.StructType projectedPartitionType) {
+      Types.StructType groupingKeyType) {

Review Comment:
   Initially, we did not have much clarify on how we wanted to call it. Renaming for consistency with other places.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] flyrain commented on a diff in pull request #6304: API, Core: Add groupingKey to ScanTaskGroup

Posted by GitBox <gi...@apache.org>.
flyrain commented on code in PR #6304:
URL: https://github.com/apache/iceberg/pull/6304#discussion_r1034212530


##########
api/src/main/java/org/apache/iceberg/ScanTaskGroup.java:
##########
@@ -26,6 +26,24 @@
  * @param <T> the type of scan tasks
  */
 public interface ScanTaskGroup<T extends ScanTask> extends ScanTask {
+  /**
+   * Returns a grouping key for this task group.
+   *
+   * <p>Tasks may produce records that belong to a specific group of data (e.g. partition). It may
+   * be beneficial to preserve that grouping while combining tasks into groups if query engines can
+   * leverage that information to avoid shuffles. Grouping keys indicate how data is split between

Review Comment:
   Nit: 
   `It may be beneficial to preserve that grouping while combining tasks into groups if query engines can leverage that information to avoid shuffles.` ->
   `Query engines could leverage the grouping key to avoid shuffles by combining tasks within the same group.` ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on a diff in pull request #6304: API, Core: Add groupingKey to ScanTaskGroup

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on code in PR #6304:
URL: https://github.com/apache/iceberg/pull/6304#discussion_r1034950972


##########
api/src/main/java/org/apache/iceberg/ScanTaskGroup.java:
##########
@@ -26,6 +26,24 @@
  * @param <T> the type of scan tasks
  */
 public interface ScanTaskGroup<T extends ScanTask> extends ScanTask {
+  /**
+   * Returns a grouping key for this task group.
+   *
+   * <p>Tasks may produce records that belong to a specific group of data (e.g. partition). It may

Review Comment:
   This description is a little too abstract for me to understand what the grouping key is from this doc. I think grouping key needs to be defined first here. Maybe 
   
   "A grouping key is a set of values that are common amongst all rows within all tasks within the group. The values may be the result of transforming the underlying data. For example, a grouping key may have the bucket ordinal computed by applying a bucketing transformation to a column of the underlying rows."
   
   Wdyt?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a diff in pull request #6304: API, Core: Add groupingKey to ScanTaskGroup

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on code in PR #6304:
URL: https://github.com/apache/iceberg/pull/6304#discussion_r1035310165


##########
core/src/main/java/org/apache/iceberg/util/TableScanUtil.java:
##########
@@ -151,38 +153,68 @@ public static <T extends PartitionScanTask> List<ScanTaskGroup<T>> planTaskGroup
     Function<T, Long> weightFunc =
         task -> Math.max(task.sizeBytes(), task.filesCount() * openFileCost);
 
-    Map<Integer, StructProjection> projectionsBySpec = Maps.newHashMap();
+    Map<Integer, StructProjection> groupingKeyProjectionsBySpec = Maps.newHashMap();
 
-    // Group tasks by their partition values
-    StructLikeMap<List<T>> tasksByPartition = StructLikeMap.create(projectedPartitionType);
+    // group tasks by grouping keys derived from their partition tuples
+    StructLikeMap<List<T>> tasksByGroupingKey = StructLikeMap.create(groupingKeyType);
 
     for (T task : tasks) {
       PartitionSpec spec = task.spec();
-      StructProjection projectedStruct =
-          projectionsBySpec.computeIfAbsent(
+      StructProjection groupingKeyProjection =
+          groupingKeyProjectionsBySpec.computeIfAbsent(
               spec.specId(),
-              specId -> StructProjection.create(spec.partitionType(), projectedPartitionType));
-      List<T> taskList =
-          tasksByPartition.computeIfAbsent(
-              projectedStruct.copyFor(task.partition()), k -> Lists.newArrayList());
+              specId -> StructProjection.create(spec.partitionType(), groupingKeyType));
+      List<T> groupingKeyTasks =
+          tasksByGroupingKey.computeIfAbsent(
+              projectGroupingKey(groupingKeyProjection, groupingKeyType, task),
+              groupingKey -> Lists.newArrayList());
       if (task instanceof SplittableScanTask<?>) {
-        ((SplittableScanTask<? extends T>) task).split(splitSize).forEach(taskList::add);
+        ((SplittableScanTask<? extends T>) task).split(splitSize).forEach(groupingKeyTasks::add);
       } else {
-        taskList.add(task);
+        groupingKeyTasks.add(task);
       }
     }
 
-    // Now apply task combining within each partition
-    return FluentIterable.from(tasksByPartition.values())
-        .transformAndConcat(ts -> toTaskGroupIterable(ts, splitSize, lookback, weightFunc))
-        .toList();
+    List<ScanTaskGroup<T>> taskGroups = Lists.newArrayList();
+
+    for (Map.Entry<StructLike, List<T>> entry : tasksByGroupingKey.entrySet()) {
+      StructLike groupingKey = entry.getKey();
+      List<T> groupingKeyTasks = entry.getValue();
+      Iterables.addAll(
+          taskGroups,
+          toTaskGroupIterable(groupingKey, groupingKeyTasks, splitSize, lookback, weightFunc));
+    }
+
+    return taskGroups;
+  }
+
+  private static StructLike projectGroupingKey(
+      StructProjection groupingKeyProjection,
+      Types.StructType groupingKeyType,
+      PartitionScanTask task) {
+
+    PartitionData groupingKey = new PartitionData(groupingKeyType);
+
+    groupingKeyProjection.wrap(task.partition());

Review Comment:
   I was saying calling the wrap outside this method, e.g. right before line 168. `groupingKeyProjection` is read only inside this method. `groupingKeyProjection` is fully constructed before this method. Then we don't need to pass in `task` or `partition` into this method.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi merged pull request #6304: API, Core: Add groupingKey to ScanTaskGroup

Posted by GitBox <gi...@apache.org>.
aokolnychyi merged PR #6304:
URL: https://github.com/apache/iceberg/pull/6304


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org