You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by sz...@apache.org on 2023/05/23 18:13:11 UTC

[iceberg] branch master updated: Spark 3.3: Harmonize RewriteDataFilesSparkAction (#7676)

This is an automated email from the ASF dual-hosted git repository.

szehon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 11eed7180b Spark 3.3: Harmonize RewriteDataFilesSparkAction (#7676)
11eed7180b is described below

commit 11eed7180b3eb39bca795840dfecc0c7a880d2f1
Author: Ajantha Bhat <aj...@gmail.com>
AuthorDate: Tue May 23 23:43:06 2023 +0530

    Spark 3.3: Harmonize RewriteDataFilesSparkAction (#7676)
    
    This commit backports PR #7630 to Spark 3.3.
---
 .../spark/actions/RewriteDataFilesSparkAction.java | 163 +++++++++------------
 .../spark/actions/TestRewriteDataFilesAction.java  |   3 +-
 2 files changed, 74 insertions(+), 92 deletions(-)

diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java
index 5f95ef3ed4..a5a69dea95 100644
--- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java
+++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java
@@ -21,7 +21,6 @@ package org.apache.iceberg.spark.actions;
 import java.io.IOException;
 import java.math.RoundingMode;
 import java.util.Arrays;
-import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -83,6 +82,9 @@ public class RewriteDataFilesSparkAction
           USE_STARTING_SEQUENCE_NUMBER,
           REWRITE_JOB_ORDER);
 
+  private static final RewriteDataFilesSparkAction.Result EMPTY_RESULT =
+      ImmutableRewriteDataFiles.Result.builder().rewriteResults(ImmutableList.of()).build();
+
   private final Table table;
 
   private Expression filter = Expressions.alwaysTrue();
@@ -146,7 +148,7 @@ public class RewriteDataFilesSparkAction
   @Override
   public RewriteDataFiles.Result execute() {
     if (table.currentSnapshot() == null) {
-      return ImmutableRewriteDataFiles.Result.builder().rewriteResults(ImmutableList.of()).build();
+      return EMPTY_RESULT;
     }
 
     long startingSnapshotId = table.currentSnapshot().snapshotId();
@@ -158,26 +160,25 @@ public class RewriteDataFilesSparkAction
 
     validateAndInitOptions();
 
-    Map<StructLike, List<List<FileScanTask>>> fileGroupsByPartition =
+    StructLikeMap<List<List<FileScanTask>>> fileGroupsByPartition =
         planFileGroups(startingSnapshotId);
     RewriteExecutionContext ctx = new RewriteExecutionContext(fileGroupsByPartition);
 
     if (ctx.totalGroupCount() == 0) {
       LOG.info("Nothing found to rewrite in {}", table.name());
-      return ImmutableRewriteDataFiles.Result.builder().rewriteResults(ImmutableList.of()).build();
+      return EMPTY_RESULT;
     }
 
     Stream<RewriteFileGroup> groupStream = toGroupStream(ctx, fileGroupsByPartition);
 
-    RewriteDataFilesCommitManager commitManager = commitManager(startingSnapshotId);
     if (partialProgressEnabled) {
-      return doExecuteWithPartialProgress(ctx, groupStream, commitManager);
+      return doExecuteWithPartialProgress(ctx, groupStream, commitManager(startingSnapshotId));
     } else {
-      return doExecute(ctx, groupStream, commitManager);
+      return doExecute(ctx, groupStream, commitManager(startingSnapshotId));
     }
   }
 
-  Map<StructLike, List<List<FileScanTask>>> planFileGroups(long startingSnapshotId) {
+  StructLikeMap<List<List<FileScanTask>>> planFileGroups(long startingSnapshotId) {
     CloseableIterable<FileScanTask> fileScanTasks =
         table
             .newScan()
@@ -188,43 +189,9 @@ public class RewriteDataFilesSparkAction
 
     try {
       StructType partitionType = table.spec().partitionType();
-      StructLikeMap<List<FileScanTask>> filesByPartition = StructLikeMap.create(partitionType);
-      StructLike emptyStruct = GenericRecord.create(partitionType);
-
-      fileScanTasks.forEach(
-          task -> {
-            // If a task uses an incompatible partition spec the data inside could contain values
-            // which
-            // belong to multiple partitions in the current spec. Treating all such files as
-            // un-partitioned and
-            // grouping them together helps to minimize new files made.
-            StructLike taskPartition =
-                task.file().specId() == table.spec().specId()
-                    ? task.file().partition()
-                    : emptyStruct;
-
-            List<FileScanTask> files = filesByPartition.get(taskPartition);
-            if (files == null) {
-              files = Lists.newArrayList();
-            }
-
-            files.add(task);
-            filesByPartition.put(taskPartition, files);
-          });
-
-      StructLikeMap<List<List<FileScanTask>>> fileGroupsByPartition =
-          StructLikeMap.create(partitionType);
-
-      filesByPartition.forEach(
-          (partition, tasks) -> {
-            Iterable<List<FileScanTask>> plannedFileGroups = rewriter.planFileGroups(tasks);
-            List<List<FileScanTask>> fileGroups = ImmutableList.copyOf(plannedFileGroups);
-            if (fileGroups.size() > 0) {
-              fileGroupsByPartition.put(partition, fileGroups);
-            }
-          });
-
-      return fileGroupsByPartition;
+      StructLikeMap<List<FileScanTask>> filesByPartition =
+          groupByPartition(partitionType, fileScanTasks);
+      return fileGroupsByPartition(filesByPartition);
     } finally {
       try {
         fileScanTasks.close();
@@ -234,6 +201,38 @@ public class RewriteDataFilesSparkAction
     }
   }
 
+  private StructLikeMap<List<FileScanTask>> groupByPartition(
+      StructType partitionType, Iterable<FileScanTask> tasks) {
+    StructLikeMap<List<FileScanTask>> filesByPartition = StructLikeMap.create(partitionType);
+    StructLike emptyStruct = GenericRecord.create(partitionType);
+
+    for (FileScanTask task : tasks) {
+      // If a task uses an incompatible partition spec the data inside could contain values
+      // which belong to multiple partitions in the current spec. Treating all such files as
+      // un-partitioned and grouping them together helps to minimize new files made.
+      StructLike taskPartition =
+          task.file().specId() == table.spec().specId() ? task.file().partition() : emptyStruct;
+
+      List<FileScanTask> files = filesByPartition.get(taskPartition);
+      if (files == null) {
+        files = Lists.newArrayList();
+      }
+
+      files.add(task);
+      filesByPartition.put(taskPartition, files);
+    }
+    return filesByPartition;
+  }
+
+  private StructLikeMap<List<List<FileScanTask>>> fileGroupsByPartition(
+      StructLikeMap<List<FileScanTask>> filesByPartition) {
+    return filesByPartition.transformValues(this::planFileGroups);
+  }
+
+  private List<List<FileScanTask>> planFileGroups(List<FileScanTask> tasks) {
+    return ImmutableList.copyOf(rewriter.planFileGroups(tasks));
+  }
+
   @VisibleForTesting
   RewriteFileGroup rewriteFiles(RewriteExecutionContext ctx, RewriteFileGroup fileGroup) {
     String desc = jobDesc(fileGroup, ctx);
@@ -299,7 +298,7 @@ public class RewriteDataFilesSparkAction
 
       Tasks.foreach(rewrittenGroups)
           .suppressFailureWhenFinished()
-          .run(group -> commitManager.abortFileGroup(group));
+          .run(commitManager::abortFileGroup);
       throw e;
     } finally {
       rewriteService.shutdown();
@@ -331,13 +330,13 @@ public class RewriteDataFilesSparkAction
       RewriteDataFilesCommitManager commitManager) {
     ExecutorService rewriteService = rewriteService();
 
-    // Start Commit Service
+    // start commit service
     int groupsPerCommit = IntMath.divide(ctx.totalGroupCount(), maxCommits, RoundingMode.CEILING);
     RewriteDataFilesCommitManager.CommitService commitService =
         commitManager.service(groupsPerCommit);
     commitService.start();
 
-    // Start rewrite tasks
+    // start rewrite tasks
     Tasks.foreach(groupStream)
         .suppressFailureWhenFinished()
         .executeWith(rewriteService)
@@ -348,7 +347,7 @@ public class RewriteDataFilesSparkAction
         .run(fileGroup -> commitService.offer(rewriteFiles(ctx, fileGroup)));
     rewriteService.shutdown();
 
-    // Stop Commit service
+    // stop commit service
     commitService.close();
     List<RewriteFileGroup> commitResults = commitService.results();
     if (commitResults.size() == 0) {
@@ -366,45 +365,29 @@ public class RewriteDataFilesSparkAction
   }
 
   Stream<RewriteFileGroup> toGroupStream(
-      RewriteExecutionContext ctx,
-      Map<StructLike, List<List<FileScanTask>>> fileGroupsByPartition) {
-    Stream<RewriteFileGroup> rewriteFileGroupStream =
-        fileGroupsByPartition.entrySet().stream()
-            .flatMap(
-                e -> {
-                  StructLike partition = e.getKey();
-                  List<List<FileScanTask>> fileGroups = e.getValue();
-                  return fileGroups.stream()
-                      .map(
-                          tasks -> {
-                            int globalIndex = ctx.currentGlobalIndex();
-                            int partitionIndex = ctx.currentPartitionIndex(partition);
-                            FileGroupInfo info =
-                                ImmutableRewriteDataFiles.FileGroupInfo.builder()
-                                    .globalIndex(globalIndex)
-                                    .partitionIndex(partitionIndex)
-                                    .partition(partition)
-                                    .build();
-                            return new RewriteFileGroup(info, tasks);
-                          });
-                });
-
-    return rewriteFileGroupStream.sorted(rewriteGroupComparator());
+      RewriteExecutionContext ctx, Map<StructLike, List<List<FileScanTask>>> groupsByPartition) {
+    return groupsByPartition.entrySet().stream()
+        .filter(e -> e.getValue().size() != 0)
+        .flatMap(
+            e -> {
+              StructLike partition = e.getKey();
+              List<List<FileScanTask>> scanGroups = e.getValue();
+              return scanGroups.stream().map(tasks -> newRewriteGroup(ctx, partition, tasks));
+            })
+        .sorted(RewriteFileGroup.comparator(rewriteJobOrder));
   }
 
-  private Comparator<RewriteFileGroup> rewriteGroupComparator() {
-    switch (rewriteJobOrder) {
-      case BYTES_ASC:
-        return Comparator.comparing(RewriteFileGroup::sizeInBytes);
-      case BYTES_DESC:
-        return Comparator.comparing(RewriteFileGroup::sizeInBytes, Comparator.reverseOrder());
-      case FILES_ASC:
-        return Comparator.comparing(RewriteFileGroup::numFiles);
-      case FILES_DESC:
-        return Comparator.comparing(RewriteFileGroup::numFiles, Comparator.reverseOrder());
-      default:
-        return (fileGroupOne, fileGroupTwo) -> 0;
-    }
+  private RewriteFileGroup newRewriteGroup(
+      RewriteExecutionContext ctx, StructLike partition, List<FileScanTask> tasks) {
+    int globalIndex = ctx.currentGlobalIndex();
+    int partitionIndex = ctx.currentPartitionIndex(partition);
+    FileGroupInfo info =
+        ImmutableRewriteDataFiles.FileGroupInfo.builder()
+            .globalIndex(globalIndex)
+            .partitionIndex(partitionIndex)
+            .partition(partition)
+            .build();
+    return new RewriteFileGroup(info, tasks);
   }
 
   void validateAndInitOptions() {
@@ -484,15 +467,13 @@ public class RewriteDataFilesSparkAction
 
   @VisibleForTesting
   static class RewriteExecutionContext {
-    private final Map<StructLike, Integer> numGroupsByPartition;
+    private final StructLikeMap<Integer> numGroupsByPartition;
     private final int totalGroupCount;
     private final Map<StructLike, Integer> partitionIndexMap;
     private final AtomicInteger groupIndex;
 
-    RewriteExecutionContext(Map<StructLike, List<List<FileScanTask>>> fileGroupsByPartition) {
-      this.numGroupsByPartition =
-          fileGroupsByPartition.entrySet().stream()
-              .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().size()));
+    RewriteExecutionContext(StructLikeMap<List<List<FileScanTask>>> fileGroupsByPartition) {
+      this.numGroupsByPartition = fileGroupsByPartition.transformValues(List::size);
       this.totalGroupCount = numGroupsByPartition.values().stream().reduce(Integer::sum).orElse(0);
       this.partitionIndexMap = Maps.newConcurrentMap();
       this.groupIndex = new AtomicInteger(1);
diff --git a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
index 761284bb56..4dba479edd 100644
--- a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
+++ b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
@@ -100,6 +100,7 @@ import org.apache.iceberg.types.Conversions;
 import org.apache.iceberg.types.Types;
 import org.apache.iceberg.types.Types.NestedField;
 import org.apache.iceberg.util.Pair;
+import org.apache.iceberg.util.StructLikeMap;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
 import org.junit.Assert;
@@ -1383,7 +1384,7 @@ public class TestRewriteDataFilesAction extends SparkTestBase {
 
   private Stream<RewriteFileGroup> toGroupStream(Table table, RewriteDataFilesSparkAction rewrite) {
     rewrite.validateAndInitOptions();
-    Map<StructLike, List<List<FileScanTask>>> fileGroupsByPartition =
+    StructLikeMap<List<List<FileScanTask>>> fileGroupsByPartition =
         rewrite.planFileGroups(table.currentSnapshot().snapshotId());
 
     return rewrite.toGroupStream(