You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by sz...@apache.org on 2023/05/19 23:09:45 UTC

[iceberg] branch master updated: Spark 3.4: Harmonize RewriteDataFilesSparkAction code with RewritePositionDeleteFilesSparkAction (#7630)

This is an automated email from the ASF dual-hosted git repository.

szehon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 8ace681824 Spark 3.4: Harmonize RewriteDataFilesSparkAction code with RewritePositionDeleteFilesSparkAction (#7630)
8ace681824 is described below

commit 8ace681824c1c11ac06c3e8d2f86be8eb2b75600
Author: Ajantha Bhat <aj...@gmail.com>
AuthorDate: Sat May 20 04:39:39 2023 +0530

    Spark 3.4: Harmonize RewriteDataFilesSparkAction code with RewritePositionDeleteFilesSparkAction (#7630)
---
 .../apache/iceberg/actions/RewriteFileGroup.java   |  17 +++
 .../spark/actions/RewriteDataFilesSparkAction.java | 163 +++++++++------------
 .../spark/actions/TestRewriteDataFilesAction.java  |   3 +-
 3 files changed, 91 insertions(+), 92 deletions(-)

diff --git a/core/src/main/java/org/apache/iceberg/actions/RewriteFileGroup.java b/core/src/main/java/org/apache/iceberg/actions/RewriteFileGroup.java
index bb62f79711..f816b5d7a4 100644
--- a/core/src/main/java/org/apache/iceberg/actions/RewriteFileGroup.java
+++ b/core/src/main/java/org/apache/iceberg/actions/RewriteFileGroup.java
@@ -19,11 +19,13 @@
 package org.apache.iceberg.actions;
 
 import java.util.Collections;
+import java.util.Comparator;
 import java.util.List;
 import java.util.Set;
 import java.util.stream.Collectors;
 import org.apache.iceberg.DataFile;
 import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.RewriteJobOrder;
 import org.apache.iceberg.actions.RewriteDataFiles.FileGroupInfo;
 import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
@@ -92,4 +94,19 @@ public class RewriteFileGroup {
   public int numFiles() {
     return fileScanTasks.size();
   }
+
+  public static Comparator<RewriteFileGroup> comparator(RewriteJobOrder rewriteJobOrder) {
+    switch (rewriteJobOrder) {
+      case BYTES_ASC:
+        return Comparator.comparing(RewriteFileGroup::sizeInBytes);
+      case BYTES_DESC:
+        return Comparator.comparing(RewriteFileGroup::sizeInBytes, Comparator.reverseOrder());
+      case FILES_ASC:
+        return Comparator.comparing(RewriteFileGroup::numFiles);
+      case FILES_DESC:
+        return Comparator.comparing(RewriteFileGroup::numFiles, Comparator.reverseOrder());
+      default:
+        return (fileGroupOne, fileGroupTwo) -> 0;
+    }
+  }
 }
diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java
index 658d3a9279..6b5628a1f4 100644
--- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java
+++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java
@@ -22,7 +22,6 @@ import java.io.IOException;
 import java.math.RoundingMode;
 import java.util.Arrays;
 import java.util.Collection;
-import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -84,6 +83,9 @@ public class RewriteDataFilesSparkAction
           USE_STARTING_SEQUENCE_NUMBER,
           REWRITE_JOB_ORDER);
 
+  private static final RewriteDataFilesSparkAction.Result EMPTY_RESULT =
+      ImmutableRewriteDataFiles.Result.builder().rewriteResults(ImmutableList.of()).build();
+
   private final Table table;
 
   private Expression filter = Expressions.alwaysTrue();
@@ -147,7 +149,7 @@ public class RewriteDataFilesSparkAction
   @Override
   public RewriteDataFiles.Result execute() {
     if (table.currentSnapshot() == null) {
-      return ImmutableRewriteDataFiles.Result.builder().rewriteResults(ImmutableList.of()).build();
+      return EMPTY_RESULT;
     }
 
     long startingSnapshotId = table.currentSnapshot().snapshotId();
@@ -159,26 +161,25 @@ public class RewriteDataFilesSparkAction
 
     validateAndInitOptions();
 
-    Map<StructLike, List<List<FileScanTask>>> fileGroupsByPartition =
+    StructLikeMap<List<List<FileScanTask>>> fileGroupsByPartition =
         planFileGroups(startingSnapshotId);
     RewriteExecutionContext ctx = new RewriteExecutionContext(fileGroupsByPartition);
 
     if (ctx.totalGroupCount() == 0) {
       LOG.info("Nothing found to rewrite in {}", table.name());
-      return ImmutableRewriteDataFiles.Result.builder().rewriteResults(ImmutableList.of()).build();
+      return EMPTY_RESULT;
     }
 
     Stream<RewriteFileGroup> groupStream = toGroupStream(ctx, fileGroupsByPartition);
 
-    RewriteDataFilesCommitManager commitManager = commitManager(startingSnapshotId);
     if (partialProgressEnabled) {
-      return doExecuteWithPartialProgress(ctx, groupStream, commitManager);
+      return doExecuteWithPartialProgress(ctx, groupStream, commitManager(startingSnapshotId));
     } else {
-      return doExecute(ctx, groupStream, commitManager);
+      return doExecute(ctx, groupStream, commitManager(startingSnapshotId));
     }
   }
 
-  Map<StructLike, List<List<FileScanTask>>> planFileGroups(long startingSnapshotId) {
+  StructLikeMap<List<List<FileScanTask>>> planFileGroups(long startingSnapshotId) {
     CloseableIterable<FileScanTask> fileScanTasks =
         table
             .newScan()
@@ -189,43 +190,9 @@ public class RewriteDataFilesSparkAction
 
     try {
       StructType partitionType = table.spec().partitionType();
-      StructLikeMap<List<FileScanTask>> filesByPartition = StructLikeMap.create(partitionType);
-      StructLike emptyStruct = GenericRecord.create(partitionType);
-
-      fileScanTasks.forEach(
-          task -> {
-            // If a task uses an incompatible partition spec the data inside could contain values
-            // which
-            // belong to multiple partitions in the current spec. Treating all such files as
-            // un-partitioned and
-            // grouping them together helps to minimize new files made.
-            StructLike taskPartition =
-                task.file().specId() == table.spec().specId()
-                    ? task.file().partition()
-                    : emptyStruct;
-
-            List<FileScanTask> files = filesByPartition.get(taskPartition);
-            if (files == null) {
-              files = Lists.newArrayList();
-            }
-
-            files.add(task);
-            filesByPartition.put(taskPartition, files);
-          });
-
-      StructLikeMap<List<List<FileScanTask>>> fileGroupsByPartition =
-          StructLikeMap.create(partitionType);
-
-      filesByPartition.forEach(
-          (partition, tasks) -> {
-            Iterable<List<FileScanTask>> plannedFileGroups = rewriter.planFileGroups(tasks);
-            List<List<FileScanTask>> fileGroups = ImmutableList.copyOf(plannedFileGroups);
-            if (fileGroups.size() > 0) {
-              fileGroupsByPartition.put(partition, fileGroups);
-            }
-          });
-
-      return fileGroupsByPartition;
+      StructLikeMap<List<FileScanTask>> filesByPartition =
+          groupByPartition(partitionType, fileScanTasks);
+      return fileGroupsByPartition(filesByPartition);
     } finally {
       try {
         fileScanTasks.close();
@@ -235,6 +202,38 @@ public class RewriteDataFilesSparkAction
     }
   }
 
+  private StructLikeMap<List<FileScanTask>> groupByPartition(
+      StructType partitionType, Iterable<FileScanTask> tasks) {
+    StructLikeMap<List<FileScanTask>> filesByPartition = StructLikeMap.create(partitionType);
+    StructLike emptyStruct = GenericRecord.create(partitionType);
+
+    for (FileScanTask task : tasks) {
+      // If a task uses an incompatible partition spec the data inside could contain values
+      // which belong to multiple partitions in the current spec. Treating all such files as
+      // un-partitioned and grouping them together helps to minimize new files made.
+      StructLike taskPartition =
+          task.file().specId() == table.spec().specId() ? task.file().partition() : emptyStruct;
+
+      List<FileScanTask> files = filesByPartition.get(taskPartition);
+      if (files == null) {
+        files = Lists.newArrayList();
+      }
+
+      files.add(task);
+      filesByPartition.put(taskPartition, files);
+    }
+    return filesByPartition;
+  }
+
+  private StructLikeMap<List<List<FileScanTask>>> fileGroupsByPartition(
+      StructLikeMap<List<FileScanTask>> filesByPartition) {
+    return filesByPartition.transformValues(this::planFileGroups);
+  }
+
+  private List<List<FileScanTask>> planFileGroups(List<FileScanTask> tasks) {
+    return ImmutableList.copyOf(rewriter.planFileGroups(tasks));
+  }
+
   @VisibleForTesting
   RewriteFileGroup rewriteFiles(RewriteExecutionContext ctx, RewriteFileGroup fileGroup) {
     String desc = jobDesc(fileGroup, ctx);
@@ -300,7 +299,7 @@ public class RewriteDataFilesSparkAction
 
       Tasks.foreach(rewrittenGroups)
           .suppressFailureWhenFinished()
-          .run(group -> commitManager.abortFileGroup(group));
+          .run(commitManager::abortFileGroup);
       throw e;
     } finally {
       rewriteService.shutdown();
@@ -332,14 +331,14 @@ public class RewriteDataFilesSparkAction
       RewriteDataFilesCommitManager commitManager) {
     ExecutorService rewriteService = rewriteService();
 
-    // Start Commit Service
+    // start commit service
     int groupsPerCommit = IntMath.divide(ctx.totalGroupCount(), maxCommits, RoundingMode.CEILING);
     RewriteDataFilesCommitManager.CommitService commitService =
         commitManager.service(groupsPerCommit);
     commitService.start();
 
     Collection<FileGroupFailureResult> rewriteFailures = new ConcurrentLinkedQueue<>();
-    // Start rewrite tasks
+    // start rewrite tasks
     Tasks.foreach(groupStream)
         .suppressFailureWhenFinished()
         .executeWith(rewriteService)
@@ -356,7 +355,7 @@ public class RewriteDataFilesSparkAction
         .run(fileGroup -> commitService.offer(rewriteFiles(ctx, fileGroup)));
     rewriteService.shutdown();
 
-    // Stop Commit service
+    // stop commit service
     commitService.close();
     List<RewriteFileGroup> commitResults = commitService.results();
     if (commitResults.size() == 0) {
@@ -377,45 +376,29 @@ public class RewriteDataFilesSparkAction
   }
 
   Stream<RewriteFileGroup> toGroupStream(
-      RewriteExecutionContext ctx,
-      Map<StructLike, List<List<FileScanTask>>> fileGroupsByPartition) {
-    Stream<RewriteFileGroup> rewriteFileGroupStream =
-        fileGroupsByPartition.entrySet().stream()
-            .flatMap(
-                e -> {
-                  StructLike partition = e.getKey();
-                  List<List<FileScanTask>> fileGroups = e.getValue();
-                  return fileGroups.stream()
-                      .map(
-                          tasks -> {
-                            int globalIndex = ctx.currentGlobalIndex();
-                            int partitionIndex = ctx.currentPartitionIndex(partition);
-                            FileGroupInfo info =
-                                ImmutableRewriteDataFiles.FileGroupInfo.builder()
-                                    .globalIndex(globalIndex)
-                                    .partitionIndex(partitionIndex)
-                                    .partition(partition)
-                                    .build();
-                            return new RewriteFileGroup(info, tasks);
-                          });
-                });
-
-    return rewriteFileGroupStream.sorted(rewriteGroupComparator());
+      RewriteExecutionContext ctx, Map<StructLike, List<List<FileScanTask>>> groupsByPartition) {
+    return groupsByPartition.entrySet().stream()
+        .filter(e -> e.getValue().size() != 0)
+        .flatMap(
+            e -> {
+              StructLike partition = e.getKey();
+              List<List<FileScanTask>> scanGroups = e.getValue();
+              return scanGroups.stream().map(tasks -> newRewriteGroup(ctx, partition, tasks));
+            })
+        .sorted(RewriteFileGroup.comparator(rewriteJobOrder));
   }
 
-  private Comparator<RewriteFileGroup> rewriteGroupComparator() {
-    switch (rewriteJobOrder) {
-      case BYTES_ASC:
-        return Comparator.comparing(RewriteFileGroup::sizeInBytes);
-      case BYTES_DESC:
-        return Comparator.comparing(RewriteFileGroup::sizeInBytes, Comparator.reverseOrder());
-      case FILES_ASC:
-        return Comparator.comparing(RewriteFileGroup::numFiles);
-      case FILES_DESC:
-        return Comparator.comparing(RewriteFileGroup::numFiles, Comparator.reverseOrder());
-      default:
-        return (fileGroupOne, fileGroupTwo) -> 0;
-    }
+  private RewriteFileGroup newRewriteGroup(
+      RewriteExecutionContext ctx, StructLike partition, List<FileScanTask> tasks) {
+    int globalIndex = ctx.currentGlobalIndex();
+    int partitionIndex = ctx.currentPartitionIndex(partition);
+    FileGroupInfo info =
+        ImmutableRewriteDataFiles.FileGroupInfo.builder()
+            .globalIndex(globalIndex)
+            .partitionIndex(partitionIndex)
+            .partition(partition)
+            .build();
+    return new RewriteFileGroup(info, tasks);
   }
 
   void validateAndInitOptions() {
@@ -495,15 +478,13 @@ public class RewriteDataFilesSparkAction
 
   @VisibleForTesting
   static class RewriteExecutionContext {
-    private final Map<StructLike, Integer> numGroupsByPartition;
+    private final StructLikeMap<Integer> numGroupsByPartition;
     private final int totalGroupCount;
     private final Map<StructLike, Integer> partitionIndexMap;
     private final AtomicInteger groupIndex;
 
-    RewriteExecutionContext(Map<StructLike, List<List<FileScanTask>>> fileGroupsByPartition) {
-      this.numGroupsByPartition =
-          fileGroupsByPartition.entrySet().stream()
-              .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().size()));
+    RewriteExecutionContext(StructLikeMap<List<List<FileScanTask>>> fileGroupsByPartition) {
+      this.numGroupsByPartition = fileGroupsByPartition.transformValues(List::size);
       this.totalGroupCount = numGroupsByPartition.values().stream().reduce(Integer::sum).orElse(0);
       this.partitionIndexMap = Maps.newConcurrentMap();
       this.groupIndex = new AtomicInteger(1);
diff --git a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
index 76b8f58d9a..bf4bef74c3 100644
--- a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
+++ b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
@@ -100,6 +100,7 @@ import org.apache.iceberg.types.Conversions;
 import org.apache.iceberg.types.Types;
 import org.apache.iceberg.types.Types.NestedField;
 import org.apache.iceberg.util.Pair;
+import org.apache.iceberg.util.StructLikeMap;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.internal.SQLConf;
@@ -1390,7 +1391,7 @@ public class TestRewriteDataFilesAction extends SparkTestBase {
 
   private Stream<RewriteFileGroup> toGroupStream(Table table, RewriteDataFilesSparkAction rewrite) {
     rewrite.validateAndInitOptions();
-    Map<StructLike, List<List<FileScanTask>>> fileGroupsByPartition =
+    StructLikeMap<List<List<FileScanTask>>> fileGroupsByPartition =
         rewrite.planFileGroups(table.currentSnapshot().snapshotId());
 
     return rewrite.toGroupStream(