You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by sz...@apache.org on 2023/05/16 23:17:12 UTC

[iceberg] branch master updated: Spark 3.4: Fixup for RewritePositionDeleteFilesSparkAction (#7389) (#7565)

This is an automated email from the ASF dual-hosted git repository.

szehon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 890ca0ef73 Spark 3.4: Fixup for RewritePositionDeleteFilesSparkAction (#7389) (#7565)
890ca0ef73 is described below

commit 890ca0ef731cb38706b5deadece4d34bd9806e18
Author: Szehon Ho <sz...@gmail.com>
AuthorDate: Tue May 16 16:17:03 2023 -0700

    Spark 3.4: Fixup for RewritePositionDeleteFilesSparkAction (#7389) (#7565)
---
 .../org/apache/iceberg/util/StructLikeMap.java     |   7 +
 ... => RewritePositionDeleteFilesSparkAction.java} | 166 +++++++++++----------
 .../apache/iceberg/spark/actions/SparkActions.java |   4 +-
 3 files changed, 93 insertions(+), 84 deletions(-)

diff --git a/core/src/main/java/org/apache/iceberg/util/StructLikeMap.java b/core/src/main/java/org/apache/iceberg/util/StructLikeMap.java
index 6af658d130..58bd030413 100644
--- a/core/src/main/java/org/apache/iceberg/util/StructLikeMap.java
+++ b/core/src/main/java/org/apache/iceberg/util/StructLikeMap.java
@@ -23,6 +23,7 @@ import java.util.Collection;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
+import java.util.function.Function;
 import org.apache.iceberg.StructLike;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
 import org.apache.iceberg.relocated.com.google.common.collect.Sets;
@@ -171,4 +172,10 @@ public class StructLikeMap<T> extends AbstractMap<StructLike, T> implements Map<
       throw new UnsupportedOperationException("Does not support setValue.");
     }
   }
+
+  public <U> StructLikeMap<U> transformValues(Function<T, U> func) {
+    StructLikeMap<U> result = create(type);
+    wrapperMap.forEach((key, value) -> result.put(key.get(), func.apply(value)));
+    return result;
+  }
 }
diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RewritePositionDeleteSparkAction.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RewritePositionDeleteFilesSparkAction.java
similarity index 79%
rename from spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RewritePositionDeleteSparkAction.java
rename to spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RewritePositionDeleteFilesSparkAction.java
index 87d945e7ae..bd8610ea73 100644
--- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RewritePositionDeleteSparkAction.java
+++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RewritePositionDeleteFilesSparkAction.java
@@ -47,7 +47,6 @@ import org.apache.iceberg.exceptions.CommitFailedException;
 import org.apache.iceberg.exceptions.ValidationException;
 import org.apache.iceberg.expressions.Expression;
 import org.apache.iceberg.io.CloseableIterable;
-import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
@@ -68,11 +67,12 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /** Spark implementation of {@link RewritePositionDeleteFiles}. */
-public class RewritePositionDeleteSparkAction
-    extends BaseSnapshotUpdateSparkAction<RewritePositionDeleteSparkAction>
+public class RewritePositionDeleteFilesSparkAction
+    extends BaseSnapshotUpdateSparkAction<RewritePositionDeleteFilesSparkAction>
     implements RewritePositionDeleteFiles {
 
-  private static final Logger LOG = LoggerFactory.getLogger(RewritePositionDeleteSparkAction.class);
+  private static final Logger LOG =
+      LoggerFactory.getLogger(RewritePositionDeleteFilesSparkAction.class);
   private static final Set<String> VALID_OPTIONS =
       ImmutableSet.of(
           MAX_CONCURRENT_FILE_GROUP_REWRITES,
@@ -90,19 +90,19 @@ public class RewritePositionDeleteSparkAction
   private boolean partialProgressEnabled;
   private RewriteJobOrder rewriteJobOrder;
 
-  RewritePositionDeleteSparkAction(SparkSession spark, Table table) {
+  RewritePositionDeleteFilesSparkAction(SparkSession spark, Table table) {
     super(spark);
     this.table = table;
     this.rewriter = new SparkBinPackPositionDeletesRewriter(spark(), table);
   }
 
   @Override
-  protected RewritePositionDeleteSparkAction self() {
+  protected RewritePositionDeleteFilesSparkAction self() {
     return this;
   }
 
   @Override
-  public RewritePositionDeleteSparkAction filter(Expression expression) {
+  public RewritePositionDeleteFilesSparkAction filter(Expression expression) {
     throw new UnsupportedOperationException("Regular filters not supported yet.");
   }
 
@@ -115,7 +115,7 @@ public class RewritePositionDeleteSparkAction
 
     validateAndInitOptions();
 
-    Map<StructLike, List<List<PositionDeletesScanTask>>> fileGroupsByPartition = planFileGroups();
+    StructLikeMap<List<List<PositionDeletesScanTask>>> fileGroupsByPartition = planFileGroups();
     RewriteExecutionContext ctx = new RewriteExecutionContext(fileGroupsByPartition);
 
     if (ctx.totalGroupCount() == 0) {
@@ -125,63 +125,68 @@ public class RewritePositionDeleteSparkAction
 
     Stream<RewritePositionDeletesGroup> groupStream = toGroupStream(ctx, fileGroupsByPartition);
 
-    RewritePositionDeletesCommitManager commitManager = commitManager();
     if (partialProgressEnabled) {
-      return doExecuteWithPartialProgress(ctx, groupStream, commitManager);
+      return doExecuteWithPartialProgress(ctx, groupStream, commitManager());
     } else {
-      return doExecute(ctx, groupStream, commitManager);
+      return doExecute(ctx, groupStream, commitManager());
     }
   }
 
-  private Map<StructLike, List<List<PositionDeletesScanTask>>> planFileGroups() {
-    Table deletesTable =
-        MetadataTableUtils.createMetadataTableInstance(table, MetadataTableType.POSITION_DELETES);
-    CloseableIterable<PositionDeletesScanTask> scanTasks =
-        CloseableIterable.transform(
-            deletesTable.newBatchScan().ignoreResiduals().planFiles(),
-            t -> (PositionDeletesScanTask) t);
+  private StructLikeMap<List<List<PositionDeletesScanTask>>> planFileGroups() {
+    CloseableIterable<PositionDeletesScanTask> fileTasks = planFiles();
 
     try {
       StructType partitionType = Partitioning.partitionType(table);
-      StructLikeMap<List<PositionDeletesScanTask>> filesByPartition =
-          StructLikeMap.create(partitionType);
-
-      for (PositionDeletesScanTask task : scanTasks) {
-        StructLike coerced = coercePartition(task, partitionType);
-
-        List<PositionDeletesScanTask> partitionTasks = filesByPartition.get(coerced);
-        if (partitionTasks == null) {
-          partitionTasks = Lists.newArrayList();
-        }
-        partitionTasks.add(task);
-        filesByPartition.put(coerced, partitionTasks);
-      }
-
-      StructLikeMap<List<List<PositionDeletesScanTask>>> fileGroupsByPartition =
-          StructLikeMap.create(partitionType);
-
-      filesByPartition.forEach(
-          (partition, partitionTasks) -> {
-            Iterable<List<PositionDeletesScanTask>> plannedFileGroups =
-                rewriter.planFileGroups(partitionTasks);
-            List<List<PositionDeletesScanTask>> groups = ImmutableList.copyOf(plannedFileGroups);
-            if (groups.size() > 0) {
-              fileGroupsByPartition.put(partition, groups);
-            }
-          });
-
-      return fileGroupsByPartition;
+      StructLikeMap<List<PositionDeletesScanTask>> fileTasksByPartition =
+          groupByPartition(partitionType, fileTasks);
+      return fileGroupsByPartition(fileTasksByPartition);
     } finally {
       try {
-        scanTasks.close();
+        fileTasks.close();
       } catch (IOException io) {
         LOG.error("Cannot properly close file iterable while planning for rewrite", io);
       }
     }
   }
 
-  @VisibleForTesting
-  RewritePositionDeletesGroup rewriteDeleteFiles(
+  private CloseableIterable<PositionDeletesScanTask> planFiles() {
+    Table deletesTable =
+        MetadataTableUtils.createMetadataTableInstance(table, MetadataTableType.POSITION_DELETES);
+
+    return CloseableIterable.transform(
+        deletesTable.newBatchScan().ignoreResiduals().planFiles(),
+        task -> (PositionDeletesScanTask) task);
+  }
+
+  private StructLikeMap<List<PositionDeletesScanTask>> groupByPartition(
+      StructType partitionType, Iterable<PositionDeletesScanTask> tasks) {
+    StructLikeMap<List<PositionDeletesScanTask>> filesByPartition =
+        StructLikeMap.create(partitionType);
+
+    for (PositionDeletesScanTask task : tasks) {
+      StructLike coerced = coercePartition(task, partitionType);
+
+      List<PositionDeletesScanTask> partitionTasks = filesByPartition.get(coerced);
+      if (partitionTasks == null) {
+        partitionTasks = Lists.newArrayList();
+      }
+      partitionTasks.add(task);
+      filesByPartition.put(coerced, partitionTasks);
+    }
+
+    return filesByPartition;
+  }
+
+  private StructLikeMap<List<List<PositionDeletesScanTask>>> fileGroupsByPartition(
+      StructLikeMap<List<PositionDeletesScanTask>> filesByPartition) {
+    return filesByPartition.transformValues(this::planFileGroups);
+  }
+
+  private List<List<PositionDeletesScanTask>> planFileGroups(List<PositionDeletesScanTask> tasks) {
+    return ImmutableList.copyOf(rewriter.planFileGroups(tasks));
+  }
+
+  private RewritePositionDeletesGroup rewriteDeleteFiles(
       RewriteExecutionContext ctx, RewritePositionDeletesGroup fileGroup) {
     String desc = jobDesc(fileGroup, ctx);
     Set<DeleteFile> addedFiles =
@@ -204,8 +209,7 @@ public class RewritePositionDeleteSparkAction
                     .build()));
   }
 
-  @VisibleForTesting
-  RewritePositionDeletesCommitManager commitManager() {
+  private RewritePositionDeletesCommitManager commitManager() {
     return new RewritePositionDeletesCommitManager(table);
   }
 
@@ -282,12 +286,12 @@ public class RewritePositionDeleteSparkAction
       RewritePositionDeletesCommitManager commitManager) {
     ExecutorService rewriteService = rewriteService();
 
-    // Start Commit Service
+    // start commit service
     int groupsPerCommit = IntMath.divide(ctx.totalGroupCount(), maxCommits, RoundingMode.CEILING);
     CommitService commitService = commitManager.service(groupsPerCommit);
     commitService.start();
 
-    // Start rewrite tasks
+    // start rewrite tasks
     Tasks.foreach(groupStream)
         .suppressFailureWhenFinished()
         .executeWith(rewriteService)
@@ -298,7 +302,7 @@ public class RewritePositionDeleteSparkAction
         .run(fileGroup -> commitService.offer(rewriteDeleteFiles(ctx, fileGroup)));
     rewriteService.shutdown();
 
-    // Stop Commit service
+    // stop commit service
     commitService.close();
     List<RewritePositionDeletesGroup> commitResults = commitService.results();
     if (commitResults.size() == 0) {
@@ -319,31 +323,31 @@ public class RewritePositionDeleteSparkAction
         .build();
   }
 
-  Stream<RewritePositionDeletesGroup> toGroupStream(
+  private Stream<RewritePositionDeletesGroup> toGroupStream(
       RewriteExecutionContext ctx,
       Map<StructLike, List<List<PositionDeletesScanTask>>> groupsByPartition) {
-    Stream<RewritePositionDeletesGroup> rewriteFileGroupStream =
-        groupsByPartition.entrySet().stream()
-            .flatMap(
-                e -> {
-                  StructLike partition = e.getKey();
-                  List<List<PositionDeletesScanTask>> scanGroups = e.getValue();
-                  return scanGroups.stream()
-                      .map(
-                          tasks -> {
-                            int globalIndex = ctx.currentGlobalIndex();
-                            int partitionIndex = ctx.currentPartitionIndex(partition);
-                            FileGroupInfo info =
-                                ImmutableRewritePositionDeleteFiles.FileGroupInfo.builder()
-                                    .globalIndex(globalIndex)
-                                    .partitionIndex(partitionIndex)
-                                    .partition(partition)
-                                    .build();
-                            return new RewritePositionDeletesGroup(info, tasks);
-                          });
-                });
-
-    return rewriteFileGroupStream.sorted(RewritePositionDeletesGroup.comparator(rewriteJobOrder));
+    return groupsByPartition.entrySet().stream()
+        .filter(e -> e.getValue().size() != 0)
+        .flatMap(
+            e -> {
+              StructLike partition = e.getKey();
+              List<List<PositionDeletesScanTask>> scanGroups = e.getValue();
+              return scanGroups.stream().map(tasks -> newRewriteGroup(ctx, partition, tasks));
+            })
+        .sorted(RewritePositionDeletesGroup.comparator(rewriteJobOrder));
+  }
+
+  private RewritePositionDeletesGroup newRewriteGroup(
+      RewriteExecutionContext ctx, StructLike partition, List<PositionDeletesScanTask> tasks) {
+    int globalIndex = ctx.currentGlobalIndex();
+    int partitionIndex = ctx.currentPartitionIndex(partition);
+    FileGroupInfo info =
+        ImmutableRewritePositionDeleteFiles.FileGroupInfo.builder()
+            .globalIndex(globalIndex)
+            .partitionIndex(partitionIndex)
+            .partition(partition)
+            .build();
+    return new RewritePositionDeletesGroup(info, tasks);
   }
 
   private void validateAndInitOptions() {
@@ -418,16 +422,14 @@ public class RewritePositionDeleteSparkAction
   }
 
   static class RewriteExecutionContext {
-    private final Map<StructLike, Integer> numGroupsByPartition;
+    private final StructLikeMap<Integer> numGroupsByPartition;
     private final int totalGroupCount;
     private final Map<StructLike, Integer> partitionIndexMap;
     private final AtomicInteger groupIndex;
 
     RewriteExecutionContext(
-        Map<StructLike, List<List<PositionDeletesScanTask>>> groupsByPartition) {
-      this.numGroupsByPartition =
-          groupsByPartition.entrySet().stream()
-              .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().size()));
+        StructLikeMap<List<List<PositionDeletesScanTask>>> fileTasksByPartition) {
+      this.numGroupsByPartition = fileTasksByPartition.transformValues(List::size);
       this.totalGroupCount = numGroupsByPartition.values().stream().reduce(Integer::sum).orElse(0);
       this.partitionIndexMap = Maps.newConcurrentMap();
       this.groupIndex = new AtomicInteger(1);
diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/SparkActions.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/SparkActions.java
index 64f1d91a18..fb67ded96e 100644
--- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/SparkActions.java
+++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/SparkActions.java
@@ -93,7 +93,7 @@ public class SparkActions implements ActionsProvider {
   }
 
   @Override
-  public RewritePositionDeleteSparkAction rewritePositionDeletes(Table table) {
-    return new RewritePositionDeleteSparkAction(spark, table);
+  public RewritePositionDeleteFilesSparkAction rewritePositionDeletes(Table table) {
+    return new RewritePositionDeleteFilesSparkAction(spark, table);
   }
 }