You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by "aokolnychyi (via GitHub)" <gi...@apache.org> on 2023/05/10 00:21:21 UTC

[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #7565: Spark 3.4: Fixup for RewritePositionDeleteFilesSparkAction

aokolnychyi commented on code in PR #7565:
URL: https://github.com/apache/iceberg/pull/7565#discussion_r1189231104


##########
spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RewritePositionDeleteFilesSparkAction.java:
##########
@@ -133,55 +133,69 @@ public RewritePositionDeleteFiles.Result execute() {
     }
   }
 
-  private Map<StructLike, List<List<PositionDeletesScanTask>>> planFileGroups() {
+  private StructLikeMap<List<List<PositionDeletesScanTask>>> planFileGroups() {
     Table deletesTable =
         MetadataTableUtils.createMetadataTableInstance(table, MetadataTableType.POSITION_DELETES);
-    CloseableIterable<PositionDeletesScanTask> scanTasks =
+
+    CloseableIterable<PositionDeletesScanTask> tasks =

Review Comment:
   optional: Shall we add `planTasks` to have task planning in a separate method?



##########
spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RewritePositionDeleteFilesSparkAction.java:
##########
@@ -133,55 +133,69 @@ public RewritePositionDeleteFiles.Result execute() {
     }
   }
 
-  private Map<StructLike, List<List<PositionDeletesScanTask>>> planFileGroups() {
+  private StructLikeMap<List<List<PositionDeletesScanTask>>> planFileGroups() {
     Table deletesTable =
         MetadataTableUtils.createMetadataTableInstance(table, MetadataTableType.POSITION_DELETES);
-    CloseableIterable<PositionDeletesScanTask> scanTasks =
+
+    CloseableIterable<PositionDeletesScanTask> tasks =
         CloseableIterable.transform(
             deletesTable.newBatchScan().ignoreResiduals().planFiles(),
             t -> (PositionDeletesScanTask) t);
 
     try {
       StructType partitionType = Partitioning.partitionType(table);
-      StructLikeMap<List<PositionDeletesScanTask>> filesByPartition =
-          StructLikeMap.create(partitionType);
-
-      for (PositionDeletesScanTask task : scanTasks) {
-        StructLike coerced = coercePartition(task, partitionType);
-
-        List<PositionDeletesScanTask> partitionTasks = filesByPartition.get(coerced);
-        if (partitionTasks == null) {
-          partitionTasks = Lists.newArrayList();
-        }
-        partitionTasks.add(task);
-        filesByPartition.put(coerced, partitionTasks);
-      }
-
-      StructLikeMap<List<List<PositionDeletesScanTask>>> fileGroupsByPartition =
-          StructLikeMap.create(partitionType);
 
-      filesByPartition.forEach(
-          (partition, partitionTasks) -> {
-            Iterable<List<PositionDeletesScanTask>> plannedFileGroups =
-                rewriter.planFileGroups(partitionTasks);
-            List<List<PositionDeletesScanTask>> groups = ImmutableList.copyOf(plannedFileGroups);
-            if (groups.size() > 0) {
-              fileGroupsByPartition.put(partition, groups);
-            }
-          });
+      StructLikeMap<List<PositionDeletesScanTask>> filesPerPartition =

Review Comment:
   minor: `filesPerPartition` -> `filesByPartition`?
   I also wonder whether we should call it `tasksByPartition` but that is up to you.



##########
core/src/main/java/org/apache/iceberg/util/StructLikeMap.java:
##########
@@ -44,6 +44,10 @@ private StructLikeMap(Types.StructType type) {
     this.wrappers = ThreadLocal.withInitial(() -> StructLikeWrapper.forType(type));
   }
 
+  public Types.StructType type() {

Review Comment:
   Would `keyType` as the method name be more accurate?



##########
spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RewritePositionDeleteFilesSparkAction.java:
##########
@@ -418,16 +431,16 @@ private String jobDesc(RewritePositionDeletesGroup group, RewriteExecutionContex
   }
 
   static class RewriteExecutionContext {
-    private final Map<StructLike, Integer> numGroupsByPartition;
+    private final StructLikeMap<Integer> numGroupsByPartition;
     private final int totalGroupCount;
     private final Map<StructLike, Integer> partitionIndexMap;
     private final AtomicInteger groupIndex;
 
-    RewriteExecutionContext(
-        Map<StructLike, List<List<PositionDeletesScanTask>>> groupsByPartition) {
-      this.numGroupsByPartition =
-          groupsByPartition.entrySet().stream()
-              .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().size()));
+    RewriteExecutionContext(StructLikeMap<List<List<PositionDeletesScanTask>>> groupsByPartition) {
+      this.numGroupsByPartition = StructLikeMap.create(groupsByPartition.type());

Review Comment:
   This would also become 1 line with `transformValues`:
   
   ```
   this.numGroupsByPartition = groupsByPartition.transformValues(List::size);
   ```
   
   If we decide not to add `transformValues`, can we add a helper method?



##########
spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RewritePositionDeleteFilesSparkAction.java:
##########
@@ -298,7 +311,7 @@ private Result doExecuteWithPartialProgress(
         .run(fileGroup -> commitService.offer(rewriteDeleteFiles(ctx, fileGroup)));
     rewriteService.shutdown();
 
-    // Stop Commit service
+    // stop commit service

Review Comment:
   Can we also slightly restructure `toGroupStream`? Spotless formatted it in a weird way. Should it also be private?
   
   ```
   private Stream<RewritePositionDeletesGroup> toGroupStream(
         RewriteExecutionContext ctx,
         StructLikeMap<List<List<PositionDeletesScanTask>>> groupsByPartition) {
   
       return groupsByPartition.entrySet().stream()
           .filter(entry -> entry.getKey().size() > 0)
           .flatMap(
               entry -> {
                 StructLike partition = entry.getKey();
                 List<List<PositionDeletesScanTask>> fileGroups = entry.getValue();
                 return fileGroups.stream().map(tasks -> newRewriteGroup(ctx, partition, tasks));
               })
           .sorted(RewritePositionDeletesGroup.comparator(rewriteJobOrder));
     }
   
     private RewritePositionDeletesGroup newRewriteGroup(
         RewriteExecutionContext ctx, StructLike partition, List<PositionDeletesScanTask> tasks) {
   
       int globalIndex = ctx.currentGlobalIndex();
       int partitionIndex = ctx.currentPartitionIndex(partition);
       FileGroupInfo info =
           ImmutableRewritePositionDeleteFiles.FileGroupInfo.builder()
               .globalIndex(globalIndex)
               .partitionIndex(partitionIndex)
               .partition(partition)
               .build();
       return new RewritePositionDeletesGroup(info, tasks);
     }
   
   ```



##########
spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RewritePositionDeleteFilesSparkAction.java:
##########
@@ -133,55 +133,69 @@ public RewritePositionDeleteFiles.Result execute() {
     }
   }
 
-  private Map<StructLike, List<List<PositionDeletesScanTask>>> planFileGroups() {
+  private StructLikeMap<List<List<PositionDeletesScanTask>>> planFileGroups() {
     Table deletesTable =
         MetadataTableUtils.createMetadataTableInstance(table, MetadataTableType.POSITION_DELETES);
-    CloseableIterable<PositionDeletesScanTask> scanTasks =
+
+    CloseableIterable<PositionDeletesScanTask> tasks =
         CloseableIterable.transform(
             deletesTable.newBatchScan().ignoreResiduals().planFiles(),
             t -> (PositionDeletesScanTask) t);
 
     try {
       StructType partitionType = Partitioning.partitionType(table);
-      StructLikeMap<List<PositionDeletesScanTask>> filesByPartition =
-          StructLikeMap.create(partitionType);
-
-      for (PositionDeletesScanTask task : scanTasks) {
-        StructLike coerced = coercePartition(task, partitionType);
-
-        List<PositionDeletesScanTask> partitionTasks = filesByPartition.get(coerced);
-        if (partitionTasks == null) {
-          partitionTasks = Lists.newArrayList();
-        }
-        partitionTasks.add(task);
-        filesByPartition.put(coerced, partitionTasks);
-      }
-
-      StructLikeMap<List<List<PositionDeletesScanTask>>> fileGroupsByPartition =
-          StructLikeMap.create(partitionType);
 
-      filesByPartition.forEach(
-          (partition, partitionTasks) -> {
-            Iterable<List<PositionDeletesScanTask>> plannedFileGroups =
-                rewriter.planFileGroups(partitionTasks);
-            List<List<PositionDeletesScanTask>> groups = ImmutableList.copyOf(plannedFileGroups);
-            if (groups.size() > 0) {
-              fileGroupsByPartition.put(partition, groups);
-            }
-          });
+      StructLikeMap<List<PositionDeletesScanTask>> filesPerPartition =
+          filesByPartition(partitionType, tasks);
 
-      return fileGroupsByPartition;
+      return fileGroupsByPartition(partitionType, filesPerPartition);
     } finally {
       try {
-        scanTasks.close();
+        tasks.close();
       } catch (IOException io) {
         LOG.error("Cannot properly close file iterable while planning for rewrite", io);
       }
     }
   }
 
-  @VisibleForTesting
-  RewritePositionDeletesGroup rewriteDeleteFiles(
+  private StructLikeMap<List<PositionDeletesScanTask>> filesByPartition(
+      StructType partitionType, Iterable<PositionDeletesScanTask> tasks) {
+    StructLikeMap<List<PositionDeletesScanTask>> filesByPartition =
+        StructLikeMap.create(partitionType);
+
+    for (PositionDeletesScanTask task : tasks) {
+      StructLike coerced = coercePartition(task, partitionType);
+
+      List<PositionDeletesScanTask> partitionTasks = filesByPartition.get(coerced);
+      if (partitionTasks == null) {
+        partitionTasks = Lists.newArrayList();
+      }
+      partitionTasks.add(task);
+      filesByPartition.put(coerced, partitionTasks);
+    }
+
+    return filesByPartition;
+  }
+
+  private StructLikeMap<List<List<PositionDeletesScanTask>>> fileGroupsByPartition(
+      StructType partitionType, StructLikeMap<List<PositionDeletesScanTask>> filesByPartition) {
+    StructLikeMap<List<List<PositionDeletesScanTask>>> fileGroupsByPartition =
+        StructLikeMap.create(partitionType);
+
+    filesByPartition.forEach(

Review Comment:
   question: It seems we would benefit from an easy way to transform `StructLikeMap` values. It is used here and below when we construct the context. Would adding something like this to `StructLikeMap` be an overkill?
   
   ```
   public <U> StructLikeMap<U> transformValues(Function<T, U> func) {
     StructLikeMap<U> map = create(type);
   
     for (Entry<StructLikeWrapper, T> entry : wrapperMap.entrySet()) {
       U newValue = func.apply(entry.getValue());
       map.put(entry.getKey().get(), newValue);
     }
   
     return map;
   }
   ```
   
   Then the entire method `fileGroupsByPartition` can be expressed in one line:
   
   ```
   tasksByPartition.transformValues(this::planFileGroups);
   
   ...
   
   private List<List<PositionDeletesScanTask>> planFileGroups(List<PositionDeletesScanTask> tasks) {
     return ImmutableList.copyOf(rewriter.planFileGroups(tasks));
   }
   ```
   
   And `RewriteExuctionContext` would call:
   
   ```
   this.numGroupsByPartition = groupsByPartition.transformValues(List::size);
   ```
   
   We would need to filter out empty file group lists but we can do that in `toGroupStream`. I am not sure it is worth it so I leave it up to you.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org