You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by ja...@apache.org on 2021/12/15 07:18:18 UTC

[iceberg] branch master updated: Spark: Add back RewriteDataFilesAction in Spark 2.4 (#3685)

This is an automated email from the ASF dual-hosted git repository.

jackye pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new d6fc0ff  Spark: Add back RewriteDataFilesAction in Spark 2.4 (#3685)
d6fc0ff is described below

commit d6fc0ff46cd9f970cae13e45a78a2380523d65f4
Author: Ajantha Bhat <aj...@gmail.com>
AuthorDate: Wed Dec 15 12:48:03 2021 +0530

    Spark: Add back RewriteDataFilesAction in Spark 2.4 (#3685)
---
 .../java/org/apache/iceberg/actions/Actions.java   |  103 ++
 .../iceberg/actions/RewriteDataFilesAction.java    |   71 ++
 .../org/apache/iceberg/actions/SparkActions.java   |   34 +
 .../actions/BaseRewriteDataFilesSparkAction.java   |  436 ---------
 .../actions/TestRewriteDataFilesAction.java        |  479 ++++++++++
 .../spark/actions/TestRewriteDataFilesAction.java  | 1005 --------------------
 6 files changed, 687 insertions(+), 1441 deletions(-)

diff --git a/spark/v2.4/spark/src/main/java/org/apache/iceberg/actions/Actions.java b/spark/v2.4/spark/src/main/java/org/apache/iceberg/actions/Actions.java
new file mode 100644
index 0000000..73fd3f3
--- /dev/null
+++ b/spark/v2.4/spark/src/main/java/org/apache/iceberg/actions/Actions.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.actions;
+
+import org.apache.iceberg.Table;
+import org.apache.iceberg.common.DynConstructors;
+import org.apache.spark.sql.SparkSession;
+
+/**
+ * An API for interacting with actions in Spark.
+ *
+ * @deprecated since 0.12.0, used for supporting {@link RewriteDataFilesAction} in Spark 2.4 for backward compatibility.
+ * This implementation is no longer maintained, the new implementation is available with Spark 3.x
+ */
+@Deprecated
+public class Actions {
+
+  // Load the actual implementation of Actions via reflection to allow for differences
+  // between the major Spark APIs while still defining the API in this class.
+  private static final String IMPL_NAME = "SparkActions";
+  private static DynConstructors.Ctor<Actions> implConstructor;
+
+  private static String implClass() {
+    return Actions.class.getPackage().getName() + "." + IMPL_NAME;
+  }
+
+  private static DynConstructors.Ctor<Actions> actionConstructor() {
+    if (implConstructor == null) {
+      String className = implClass();
+      try {
+        implConstructor =
+            DynConstructors.builder().hiddenImpl(className, SparkSession.class, Table.class).buildChecked();
+      } catch (NoSuchMethodException e) {
+        throw new IllegalArgumentException("Cannot find appropriate Actions implementation on the classpath.", e);
+      }
+    }
+    return implConstructor;
+  }
+
+  private SparkSession spark;
+  private Table table;
+
+  protected Actions(SparkSession spark, Table table) {
+    this.spark = spark;
+    this.table = table;
+  }
+
+  /**
+   * @deprecated since 0.12.0, used for supporting {@link RewriteDataFilesAction}
+   * in Spark 2.4 for backward compatibility.
+   * This implementation is no longer maintained, the new implementation is available with Spark 3.x
+   */
+  @Deprecated
+  public static Actions forTable(SparkSession spark, Table table) {
+    return actionConstructor().newInstance(spark, table);
+  }
+
+  /**
+   * @deprecated since 0.12.0, used for supporting {@link RewriteDataFilesAction}
+   * in Spark 2.4 for backward compatibility.
+   * This implementation is no longer maintained, the new implementation is available with Spark 3.x
+   */
+  @Deprecated
+  public static Actions forTable(Table table) {
+    return forTable(SparkSession.active(), table);
+  }
+
+  /**
+   * @deprecated since 0.12.0, used for supporting {@link RewriteDataFilesAction}
+   * in Spark 2.4 for backward compatibility.
+   * This implementation is no longer maintained, the new implementation is available with Spark 3.x
+   */
+  @Deprecated
+  public RewriteDataFilesAction rewriteDataFiles() {
+    return new RewriteDataFilesAction(spark, table);
+  }
+
+  protected SparkSession spark() {
+    return spark;
+  }
+
+  protected Table table() {
+    return table;
+  }
+
+}
diff --git a/spark/v2.4/spark/src/main/java/org/apache/iceberg/actions/RewriteDataFilesAction.java b/spark/v2.4/spark/src/main/java/org/apache/iceberg/actions/RewriteDataFilesAction.java
new file mode 100644
index 0000000..0418a9c
--- /dev/null
+++ b/spark/v2.4/spark/src/main/java/org/apache/iceberg/actions/RewriteDataFilesAction.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.actions;
+
+import java.util.List;
+import org.apache.iceberg.CombinedScanTask;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.SerializableTable;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.spark.SparkUtil;
+import org.apache.iceberg.spark.source.RowDataRewriter;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.broadcast.Broadcast;
+import org.apache.spark.sql.SparkSession;
+
+/**
+ * @deprecated since 0.12.0, keeping this in Spark 2.4 for backward compatibility.
+ * This implementation is no longer maintained, the new implementation is available with Spark 3.x
+ */
+@Deprecated
+public class RewriteDataFilesAction
+    extends BaseRewriteDataFilesAction<RewriteDataFilesAction> {
+
+  private final JavaSparkContext sparkContext;
+  private FileIO fileIO;
+
+  RewriteDataFilesAction(SparkSession spark, Table table) {
+    super(table);
+    this.sparkContext = JavaSparkContext.fromSparkContext(spark.sparkContext());
+  }
+
+  @Override
+  protected RewriteDataFilesAction self() {
+    return this;
+  }
+
+  @Override
+  protected FileIO fileIO() {
+    if (this.fileIO == null) {
+      this.fileIO = SparkUtil.serializableFileIO(table());
+    }
+    return this.fileIO;
+  }
+
+  @Override
+  protected List<DataFile> rewriteDataForTasks(List<CombinedScanTask> combinedScanTasks) {
+    JavaRDD<CombinedScanTask> taskRDD = sparkContext.parallelize(combinedScanTasks, combinedScanTasks.size());
+    Broadcast<Table> tableBroadcast = sparkContext.broadcast(SerializableTable.copyOf(table()));
+    RowDataRewriter rowDataRewriter = new RowDataRewriter(tableBroadcast, spec(), caseSensitive());
+    return rowDataRewriter.rewriteDataForTasks(taskRDD);
+  }
+}
diff --git a/spark/v2.4/spark/src/main/java/org/apache/iceberg/actions/SparkActions.java b/spark/v2.4/spark/src/main/java/org/apache/iceberg/actions/SparkActions.java
new file mode 100644
index 0000000..97ec74b
--- /dev/null
+++ b/spark/v2.4/spark/src/main/java/org/apache/iceberg/actions/SparkActions.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.actions;
+
+import org.apache.iceberg.Table;
+import org.apache.spark.sql.SparkSession;
+
+/**
+ * @deprecated since 0.12.0, used for supporting {@link RewriteDataFilesAction} in Spark 2.4 for backward compatibility.
+ * This implementation is no longer maintained, the new implementation is available with Spark 3.x
+ */
+@Deprecated
+class SparkActions extends Actions {
+  protected SparkActions(SparkSession spark, Table table) {
+    super(spark, table);
+  }
+}
diff --git a/spark/v2.4/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteDataFilesSparkAction.java b/spark/v2.4/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteDataFilesSparkAction.java
deleted file mode 100644
index 406b2bb..0000000
--- a/spark/v2.4/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteDataFilesSparkAction.java
+++ /dev/null
@@ -1,436 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iceberg.spark.actions;
-
-import java.io.IOException;
-import java.math.RoundingMode;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
-import org.apache.iceberg.DataFile;
-import org.apache.iceberg.FileScanTask;
-import org.apache.iceberg.SortOrder;
-import org.apache.iceberg.StructLike;
-import org.apache.iceberg.Table;
-import org.apache.iceberg.actions.BaseRewriteDataFilesFileGroupInfo;
-import org.apache.iceberg.actions.BaseRewriteDataFilesResult;
-import org.apache.iceberg.actions.BinPackStrategy;
-import org.apache.iceberg.actions.RewriteDataFiles;
-import org.apache.iceberg.actions.RewriteDataFilesCommitManager;
-import org.apache.iceberg.actions.RewriteFileGroup;
-import org.apache.iceberg.actions.RewriteStrategy;
-import org.apache.iceberg.actions.SortStrategy;
-import org.apache.iceberg.data.GenericRecord;
-import org.apache.iceberg.exceptions.CommitFailedException;
-import org.apache.iceberg.exceptions.ValidationException;
-import org.apache.iceberg.expressions.Expression;
-import org.apache.iceberg.expressions.Expressions;
-import org.apache.iceberg.io.CloseableIterable;
-import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
-import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
-import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
-import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
-import org.apache.iceberg.relocated.com.google.common.collect.Lists;
-import org.apache.iceberg.relocated.com.google.common.collect.Maps;
-import org.apache.iceberg.relocated.com.google.common.collect.Queues;
-import org.apache.iceberg.relocated.com.google.common.collect.Sets;
-import org.apache.iceberg.relocated.com.google.common.math.IntMath;
-import org.apache.iceberg.relocated.com.google.common.util.concurrent.MoreExecutors;
-import org.apache.iceberg.relocated.com.google.common.util.concurrent.ThreadFactoryBuilder;
-import org.apache.iceberg.types.Types.StructType;
-import org.apache.iceberg.util.PropertyUtil;
-import org.apache.iceberg.util.StructLikeMap;
-import org.apache.iceberg.util.Tasks;
-import org.apache.spark.sql.SparkSession;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-abstract class BaseRewriteDataFilesSparkAction
-    extends BaseSnapshotUpdateSparkAction<RewriteDataFiles, RewriteDataFiles.Result> implements RewriteDataFiles {
-
-  private static final Logger LOG = LoggerFactory.getLogger(BaseRewriteDataFilesSparkAction.class);
-  private static final Set<String> VALID_OPTIONS = ImmutableSet.of(
-      MAX_CONCURRENT_FILE_GROUP_REWRITES,
-      MAX_FILE_GROUP_SIZE_BYTES,
-      PARTIAL_PROGRESS_ENABLED,
-      PARTIAL_PROGRESS_MAX_COMMITS,
-      TARGET_FILE_SIZE_BYTES
-  );
-
-  private final Table table;
-
-  private Expression filter = Expressions.alwaysTrue();
-  private int maxConcurrentFileGroupRewrites;
-  private int maxCommits;
-  private boolean partialProgressEnabled;
-  private RewriteStrategy strategy = null;
-
-  protected BaseRewriteDataFilesSparkAction(SparkSession spark, Table table) {
-    super(spark);
-    this.table = table;
-  }
-
-  protected Table table() {
-    return table;
-  }
-
-  /**
-   * The framework specific {@link BinPackStrategy}
-   */
-  protected abstract BinPackStrategy binPackStrategy();
-
-  /**
-   * The framework specific {@link SortStrategy}
-   */
-  protected abstract SortStrategy sortStrategy();
-
-  @Override
-  public RewriteDataFiles binPack() {
-    Preconditions.checkArgument(this.strategy == null,
-        "Cannot set strategy to binpack, it has already been set", this.strategy);
-    this.strategy = binPackStrategy();
-    return this;
-  }
-
-  @Override
-  public RewriteDataFiles sort(SortOrder sortOrder) {
-    Preconditions.checkArgument(this.strategy == null,
-        "Cannot set strategy to sort, it has already been set to %s", this.strategy);
-    this.strategy = sortStrategy().sortOrder(sortOrder);
-    return this;
-  }
-
-  @Override
-  public RewriteDataFiles sort() {
-    Preconditions.checkArgument(this.strategy == null,
-        "Cannot set strategy to sort, it has already been set to %s", this.strategy);
-    this.strategy = sortStrategy();
-    return this;
-  }
-
-  @Override
-  public RewriteDataFiles filter(Expression expression) {
-    filter = Expressions.and(filter, expression);
-    return this;
-  }
-
-  @Override
-  public RewriteDataFiles.Result execute() {
-    if (table.currentSnapshot() == null) {
-      return new BaseRewriteDataFilesResult(ImmutableList.of());
-    }
-
-    long startingSnapshotId = table.currentSnapshot().snapshotId();
-
-    // Default to BinPack if no strategy selected
-    if (this.strategy == null) {
-      this.strategy = binPackStrategy();
-    }
-
-    validateAndInitOptions();
-    strategy = strategy.options(options());
-
-    Map<StructLike, List<List<FileScanTask>>> fileGroupsByPartition = planFileGroups(startingSnapshotId);
-    RewriteExecutionContext ctx = new RewriteExecutionContext(fileGroupsByPartition);
-
-    if (ctx.totalGroupCount() == 0) {
-      LOG.info("Nothing found to rewrite in {}", table.name());
-      return new BaseRewriteDataFilesResult(Collections.emptyList());
-    }
-
-    Stream<RewriteFileGroup> groupStream = toGroupStream(ctx, fileGroupsByPartition);
-
-    RewriteDataFilesCommitManager commitManager = commitManager(startingSnapshotId);
-    if (partialProgressEnabled) {
-      return doExecuteWithPartialProgress(ctx, groupStream, commitManager);
-    } else {
-      return doExecute(ctx, groupStream, commitManager);
-    }
-  }
-
-  private Map<StructLike, List<List<FileScanTask>>> planFileGroups(long startingSnapshotId) {
-    CloseableIterable<FileScanTask> fileScanTasks = table.newScan()
-        .useSnapshot(startingSnapshotId)
-        .filter(filter)
-        .ignoreResiduals()
-        .planFiles();
-
-    try {
-      StructType partitionType = table.spec().partitionType();
-      StructLikeMap<List<FileScanTask>> filesByPartition = StructLikeMap.create(partitionType);
-      StructLike emptyStruct = GenericRecord.create(partitionType);
-
-      fileScanTasks.forEach(task -> {
-        // If a task uses an incompatible partition spec the data inside could contain values which
-        // belong to multiple partitions in the current spec. Treating all such files as un-partitioned and
-        // grouping them together helps to minimize new files made.
-        StructLike taskPartition = task.file().specId() == table.spec().specId() ?
-            task.file().partition() : emptyStruct;
-
-        List<FileScanTask> files = filesByPartition.get(taskPartition);
-        if (files == null) {
-          files = Lists.newArrayList();
-        }
-
-        files.add(task);
-        filesByPartition.put(taskPartition, files);
-      });
-
-      StructLikeMap<List<List<FileScanTask>>> fileGroupsByPartition = StructLikeMap.create(partitionType);
-
-      filesByPartition.forEach((partition, tasks) -> {
-        Iterable<FileScanTask> filtered = strategy.selectFilesToRewrite(tasks);
-        Iterable<List<FileScanTask>> groupedTasks = strategy.planFileGroups(filtered);
-        List<List<FileScanTask>> fileGroups = ImmutableList.copyOf(groupedTasks);
-        if (fileGroups.size() > 0) {
-          fileGroupsByPartition.put(partition, fileGroups);
-        }
-      });
-
-      return fileGroupsByPartition;
-    } finally {
-      try {
-        fileScanTasks.close();
-      } catch (IOException io) {
-        LOG.error("Cannot properly close file iterable while planning for rewrite", io);
-      }
-    }
-  }
-
-  @VisibleForTesting
-  RewriteFileGroup rewriteFiles(RewriteExecutionContext ctx, RewriteFileGroup fileGroup) {
-    String desc = jobDesc(fileGroup, ctx);
-    Set<DataFile> addedFiles = withJobGroupInfo(
-        newJobGroupInfo("REWRITE-DATA-FILES", desc),
-        () -> strategy.rewriteFiles(fileGroup.fileScans()));
-
-    fileGroup.setOutputFiles(addedFiles);
-    LOG.info("Rewrite Files Ready to be Committed - {}", desc);
-    return fileGroup;
-  }
-
-  private ExecutorService rewriteService() {
-    return MoreExecutors.getExitingExecutorService(
-        (ThreadPoolExecutor) Executors.newFixedThreadPool(
-            maxConcurrentFileGroupRewrites,
-            new ThreadFactoryBuilder()
-                .setNameFormat("Rewrite-Service-%d")
-                .build()));
-  }
-
-  @VisibleForTesting
-  RewriteDataFilesCommitManager commitManager(long startingSnapshotId) {
-    return new RewriteDataFilesCommitManager(table, startingSnapshotId);
-  }
-
-  private Result doExecute(RewriteExecutionContext ctx, Stream<RewriteFileGroup> groupStream,
-                           RewriteDataFilesCommitManager commitManager) {
-    ExecutorService rewriteService = rewriteService();
-
-    ConcurrentLinkedQueue<RewriteFileGroup> rewrittenGroups = Queues.newConcurrentLinkedQueue();
-
-    Tasks.Builder<RewriteFileGroup> rewriteTaskBuilder = Tasks.foreach(groupStream)
-        .executeWith(rewriteService)
-        .stopOnFailure()
-        .noRetry()
-        .onFailure((fileGroup, exception) -> {
-          LOG.warn("Failure during rewrite process for group {}", fileGroup.info(), exception);
-        });
-
-    try {
-      rewriteTaskBuilder.run(fileGroup -> {
-        rewrittenGroups.add(rewriteFiles(ctx, fileGroup));
-      });
-    } catch (Exception e) {
-      // At least one rewrite group failed, clean up all completed rewrites
-      LOG.error("Cannot complete rewrite, {} is not enabled and one of the file set groups failed to " +
-          "be rewritten. This error occurred during the writing of new files, not during the commit process. This " +
-          "indicates something is wrong that doesn't involve conflicts with other Iceberg operations. Enabling " +
-          "{} may help in this case but the root cause should be investigated. Cleaning up {} groups which finished " +
-          "being written.", PARTIAL_PROGRESS_ENABLED, PARTIAL_PROGRESS_ENABLED, rewrittenGroups.size(), e);
-
-      Tasks.foreach(rewrittenGroups)
-          .suppressFailureWhenFinished()
-          .run(group -> commitManager.abortFileGroup(group));
-      throw e;
-    } finally {
-      rewriteService.shutdown();
-    }
-
-    try {
-      commitManager.commitOrClean(Sets.newHashSet(rewrittenGroups));
-    } catch (ValidationException | CommitFailedException e) {
-      String errorMessage = String.format(
-          "Cannot commit rewrite because of a ValidationException or CommitFailedException. This usually means that " +
-              "this rewrite has conflicted with another concurrent Iceberg operation. To reduce the likelihood of " +
-              "conflicts, set %s which will break up the rewrite into multiple smaller commits controlled by %s. " +
-              "Separate smaller rewrite commits can succeed independently while any commits that conflict with " +
-              "another Iceberg operation will be ignored. This mode will create additional snapshots in the table " +
-              "history, one for each commit.",
-          PARTIAL_PROGRESS_ENABLED, PARTIAL_PROGRESS_MAX_COMMITS);
-      throw new RuntimeException(errorMessage, e);
-    }
-
-    List<FileGroupRewriteResult> rewriteResults = rewrittenGroups.stream()
-        .map(RewriteFileGroup::asResult)
-        .collect(Collectors.toList());
-    return new BaseRewriteDataFilesResult(rewriteResults);
-  }
-
-  private Result doExecuteWithPartialProgress(RewriteExecutionContext ctx, Stream<RewriteFileGroup> groupStream,
-                                              RewriteDataFilesCommitManager commitManager) {
-    ExecutorService rewriteService = rewriteService();
-
-    // Start Commit Service
-    int groupsPerCommit = IntMath.divide(ctx.totalGroupCount(), maxCommits, RoundingMode.CEILING);
-    RewriteDataFilesCommitManager.CommitService commitService = commitManager.service(groupsPerCommit);
-    commitService.start();
-
-    // Start rewrite tasks
-    Tasks.foreach(groupStream)
-        .suppressFailureWhenFinished()
-        .executeWith(rewriteService)
-        .noRetry()
-        .onFailure((fileGroup, exception) -> LOG.error("Failure during rewrite group {}", fileGroup.info(), exception))
-        .run(fileGroup -> commitService.offer(rewriteFiles(ctx, fileGroup)));
-    rewriteService.shutdown();
-
-    // Stop Commit service
-    commitService.close();
-    List<RewriteFileGroup> commitResults = commitService.results();
-    if (commitResults.size() == 0) {
-      LOG.error("{} is true but no rewrite commits succeeded. Check the logs to determine why the individual " +
-          "commits failed. If this is persistent it may help to increase {} which will break the rewrite operation " +
-          "into smaller commits.", PARTIAL_PROGRESS_ENABLED, PARTIAL_PROGRESS_MAX_COMMITS);
-    }
-
-    List<FileGroupRewriteResult> rewriteResults = commitResults.stream()
-        .map(RewriteFileGroup::asResult)
-        .collect(Collectors.toList());
-    return new BaseRewriteDataFilesResult(rewriteResults);
-  }
-
-  private Stream<RewriteFileGroup> toGroupStream(RewriteExecutionContext ctx,
-                                                 Map<StructLike, List<List<FileScanTask>>> fileGroupsByPartition) {
-
-    // Todo Add intelligence to the order in which we do rewrites instead of just using partition order
-    return fileGroupsByPartition.entrySet().stream()
-        .flatMap(e -> {
-          StructLike partition = e.getKey();
-          List<List<FileScanTask>> fileGroups = e.getValue();
-          return fileGroups.stream().map(tasks -> {
-            int globalIndex = ctx.currentGlobalIndex();
-            int partitionIndex = ctx.currentPartitionIndex(partition);
-            FileGroupInfo info = new BaseRewriteDataFilesFileGroupInfo(globalIndex, partitionIndex, partition);
-            return new RewriteFileGroup(info, tasks);
-          });
-        });
-  }
-
-  private void validateAndInitOptions() {
-    Set<String> validOptions = Sets.newHashSet(strategy.validOptions());
-    validOptions.addAll(VALID_OPTIONS);
-
-    Set<String> invalidKeys = Sets.newHashSet(options().keySet());
-    invalidKeys.removeAll(validOptions);
-
-    Preconditions.checkArgument(invalidKeys.isEmpty(),
-        "Cannot use options %s, they are not supported by the action or the strategy %s",
-        invalidKeys, strategy.name());
-
-    maxConcurrentFileGroupRewrites = PropertyUtil.propertyAsInt(options(),
-        MAX_CONCURRENT_FILE_GROUP_REWRITES,
-        MAX_CONCURRENT_FILE_GROUP_REWRITES_DEFAULT);
-
-    maxCommits = PropertyUtil.propertyAsInt(options(),
-        PARTIAL_PROGRESS_MAX_COMMITS,
-        PARTIAL_PROGRESS_MAX_COMMITS_DEFAULT);
-
-    partialProgressEnabled = PropertyUtil.propertyAsBoolean(options(),
-        PARTIAL_PROGRESS_ENABLED,
-        PARTIAL_PROGRESS_ENABLED_DEFAULT);
-
-    Preconditions.checkArgument(maxConcurrentFileGroupRewrites >= 1,
-        "Cannot set %s to %s, the value must be positive.",
-        MAX_CONCURRENT_FILE_GROUP_REWRITES, maxConcurrentFileGroupRewrites);
-
-    Preconditions.checkArgument(!partialProgressEnabled || partialProgressEnabled && maxCommits > 0,
-        "Cannot set %s to %s, the value must be positive when %s is true",
-        PARTIAL_PROGRESS_MAX_COMMITS, maxCommits, PARTIAL_PROGRESS_ENABLED);
-  }
-
-  private String jobDesc(RewriteFileGroup group, RewriteExecutionContext ctx) {
-    StructLike partition = group.info().partition();
-    if (partition.size() > 0) {
-      return String.format("Rewriting %d files (%s, file group %d/%d, %s (%d/%d)) in %s",
-          group.rewrittenFiles().size(),
-          strategy.name(), group.info().globalIndex(),
-          ctx.totalGroupCount(), partition, group.info().partitionIndex(), ctx.groupsInPartition(partition),
-          table.name());
-    } else {
-      return String.format("Rewriting %d files (%s, file group %d/%d) in %s",
-          group.rewrittenFiles().size(),
-          strategy.name(), group.info().globalIndex(), ctx.totalGroupCount(),
-          table.name());
-    }
-  }
-
-  @VisibleForTesting
-  static class RewriteExecutionContext {
-    private final Map<StructLike, Integer> numGroupsByPartition;
-    private final int totalGroupCount;
-    private final Map<StructLike, Integer> partitionIndexMap;
-    private final AtomicInteger groupIndex;
-
-    RewriteExecutionContext(Map<StructLike, List<List<FileScanTask>>> fileGroupsByPartition) {
-      this.numGroupsByPartition = fileGroupsByPartition.entrySet().stream()
-          .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().size()));
-      this.totalGroupCount = numGroupsByPartition.values().stream()
-          .reduce(Integer::sum)
-          .orElse(0);
-      this.partitionIndexMap = Maps.newConcurrentMap();
-      this.groupIndex = new AtomicInteger(1);
-    }
-
-    public int currentGlobalIndex() {
-      return groupIndex.getAndIncrement();
-    }
-
-    public int currentPartitionIndex(StructLike partition) {
-      return partitionIndexMap.merge(partition, 1, Integer::sum);
-    }
-
-    public int groupsInPartition(StructLike partition) {
-      return numGroupsByPartition.get(partition);
-    }
-
-    public int totalGroupCount() {
-      return totalGroupCount;
-    }
-  }
-}
diff --git a/spark/v2.4/spark/src/test/java/org/apache/iceberg/actions/TestRewriteDataFilesAction.java b/spark/v2.4/spark/src/test/java/org/apache/iceberg/actions/TestRewriteDataFilesAction.java
new file mode 100644
index 0000000..91b50cf
--- /dev/null
+++ b/spark/v2.4/spark/src/test/java/org/apache/iceberg/actions/TestRewriteDataFilesAction.java
@@ -0,0 +1,479 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.actions;
+
+import java.io.File;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.IntStream;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.hadoop.HadoopTables;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.spark.SparkTestBase;
+import org.apache.iceberg.spark.source.ThreeColumnRecord;
+import org.apache.iceberg.types.Types;
+import org.apache.spark.sql.AnalysisException;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Encoders;
+import org.apache.spark.sql.Row;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import static org.apache.iceberg.types.Types.NestedField.optional;
+
+public class TestRewriteDataFilesAction extends SparkTestBase {
+
+  private static final HadoopTables TABLES = new HadoopTables(new Configuration());
+  private static final Schema SCHEMA = new Schema(
+      optional(1, "c1", Types.IntegerType.get()),
+      optional(2, "c2", Types.StringType.get()),
+      optional(3, "c3", Types.StringType.get())
+  );
+
+  @Rule
+  public TemporaryFolder temp = new TemporaryFolder();
+
+  private String tableLocation = null;
+
+  @Before
+  public void setupTableLocation() throws Exception {
+    File tableDir = temp.newFolder();
+    this.tableLocation = tableDir.toURI().toString();
+  }
+
+  @Test
+  public void testRewriteDataFilesEmptyTable() {
+    PartitionSpec spec = PartitionSpec.unpartitioned();
+    Map<String, String> options = Maps.newHashMap();
+    Table table = TABLES.create(SCHEMA, spec, options, tableLocation);
+
+    Assert.assertNull("Table must be empty", table.currentSnapshot());
+
+    Actions actions = Actions.forTable(table);
+
+    actions.rewriteDataFiles().execute();
+
+    Assert.assertNull("Table must stay empty", table.currentSnapshot());
+  }
+
+  @Test
+  public void testRewriteDataFilesUnpartitionedTable() {
+    PartitionSpec spec = PartitionSpec.unpartitioned();
+    Map<String, String> options = Maps.newHashMap();
+    Table table = TABLES.create(SCHEMA, spec, options, tableLocation);
+
+    List<ThreeColumnRecord> records1 = Lists.newArrayList(
+        new ThreeColumnRecord(1, null, "AAAA"),
+        new ThreeColumnRecord(1, "BBBBBBBBBB", "BBBB")
+    );
+    writeRecords(records1);
+
+    List<ThreeColumnRecord> records2 = Lists.newArrayList(
+        new ThreeColumnRecord(2, "CCCCCCCCCC", "CCCC"),
+        new ThreeColumnRecord(2, "DDDDDDDDDD", "DDDD")
+    );
+    writeRecords(records2);
+
+    table.refresh();
+
+    CloseableIterable<FileScanTask> tasks = table.newScan().planFiles();
+    List<DataFile> dataFiles = Lists.newArrayList(CloseableIterable.transform(tasks, FileScanTask::file));
+    Assert.assertEquals("Should have 4 data files before rewrite", 4, dataFiles.size());
+
+    Actions actions = Actions.forTable(table);
+
+    RewriteDataFilesActionResult result = actions.rewriteDataFiles().execute();
+    Assert.assertEquals("Action should rewrite 4 data files", 4, result.deletedDataFiles().size());
+    Assert.assertEquals("Action should add 1 data file", 1, result.addedDataFiles().size());
+
+    table.refresh();
+
+    CloseableIterable<FileScanTask> tasks1 = table.newScan().planFiles();
+    List<DataFile> dataFiles1 = Lists.newArrayList(CloseableIterable.transform(tasks1, FileScanTask::file));
+    Assert.assertEquals("Should have 1 data files before rewrite", 1, dataFiles1.size());
+
+    List<ThreeColumnRecord> expectedRecords = Lists.newArrayList();
+    expectedRecords.addAll(records1);
+    expectedRecords.addAll(records2);
+
+    Dataset<Row> resultDF = spark.read().format("iceberg").load(tableLocation);
+    List<ThreeColumnRecord> actualRecords = resultDF.sort("c1", "c2")
+        .as(Encoders.bean(ThreeColumnRecord.class))
+        .collectAsList();
+
+    Assert.assertEquals("Rows must match", expectedRecords, actualRecords);
+  }
+
+  @Test
+  public void testRewriteDataFilesPartitionedTable() {
+    PartitionSpec spec = PartitionSpec.builderFor(SCHEMA)
+        .identity("c1")
+        .truncate("c2", 2)
+        .build();
+    Map<String, String> options = Maps.newHashMap();
+    Table table = TABLES.create(SCHEMA, spec, options, tableLocation);
+
+    List<ThreeColumnRecord> records1 = Lists.newArrayList(
+        new ThreeColumnRecord(1, "AAAAAAAAAA", "AAAA"),
+        new ThreeColumnRecord(1, "AAAAAAAAAA", "CCCC")
+    );
+    writeRecords(records1);
+
+    List<ThreeColumnRecord> records2 = Lists.newArrayList(
+        new ThreeColumnRecord(1, "BBBBBBBBBB", "BBBB"),
+        new ThreeColumnRecord(1, "BBBBBBBBBB", "DDDD")
+    );
+    writeRecords(records2);
+
+    List<ThreeColumnRecord> records3 = Lists.newArrayList(
+        new ThreeColumnRecord(2, "AAAAAAAAAA", "EEEE"),
+        new ThreeColumnRecord(2, "AAAAAAAAAA", "GGGG")
+    );
+    writeRecords(records3);
+
+    List<ThreeColumnRecord> records4 = Lists.newArrayList(
+        new ThreeColumnRecord(2, "BBBBBBBBBB", "FFFF"),
+        new ThreeColumnRecord(2, "BBBBBBBBBB", "HHHH")
+    );
+    writeRecords(records4);
+
+    table.refresh();
+
+    CloseableIterable<FileScanTask> tasks = table.newScan().planFiles();
+    List<DataFile> dataFiles = Lists.newArrayList(CloseableIterable.transform(tasks, FileScanTask::file));
+    Assert.assertEquals("Should have 8 data files before rewrite", 8, dataFiles.size());
+
+    Actions actions = Actions.forTable(table);
+
+    RewriteDataFilesActionResult result = actions.rewriteDataFiles().execute();
+    Assert.assertEquals("Action should rewrite 8 data files", 8, result.deletedDataFiles().size());
+    Assert.assertEquals("Action should add 4 data file", 4, result.addedDataFiles().size());
+
+    table.refresh();
+
+    CloseableIterable<FileScanTask> tasks1 = table.newScan().planFiles();
+    List<DataFile> dataFiles1 = Lists.newArrayList(CloseableIterable.transform(tasks1, FileScanTask::file));
+    Assert.assertEquals("Should have 4 data files before rewrite", 4, dataFiles1.size());
+
+    List<ThreeColumnRecord> expectedRecords = Lists.newArrayList();
+    expectedRecords.addAll(records1);
+    expectedRecords.addAll(records2);
+    expectedRecords.addAll(records3);
+    expectedRecords.addAll(records4);
+
+    Dataset<Row> resultDF = spark.read().format("iceberg").load(tableLocation);
+    List<ThreeColumnRecord> actualRecords = resultDF.sort("c1", "c2", "c3")
+        .as(Encoders.bean(ThreeColumnRecord.class))
+        .collectAsList();
+
+    Assert.assertEquals("Rows must match", expectedRecords, actualRecords);
+  }
+
+  @Test
+  public void testRewriteDataFilesWithFilter() {
+    PartitionSpec spec = PartitionSpec.builderFor(SCHEMA)
+        .identity("c1")
+        .truncate("c2", 2)
+        .build();
+    Map<String, String> options = Maps.newHashMap();
+    Table table = TABLES.create(SCHEMA, spec, options, tableLocation);
+
+    List<ThreeColumnRecord> records1 = Lists.newArrayList(
+        new ThreeColumnRecord(1, "AAAAAAAAAA", "AAAA"),
+        new ThreeColumnRecord(1, "AAAAAAAAAA", "CCCC")
+    );
+    writeRecords(records1);
+
+    List<ThreeColumnRecord> records2 = Lists.newArrayList(
+        new ThreeColumnRecord(1, "BBBBBBBBBB", "BBBB"),
+        new ThreeColumnRecord(1, "BBBBBBBBBB", "DDDD")
+    );
+    writeRecords(records2);
+
+    List<ThreeColumnRecord> records3 = Lists.newArrayList(
+        new ThreeColumnRecord(2, "AAAAAAAAAA", "EEEE"),
+        new ThreeColumnRecord(2, "AAAAAAAAAA", "GGGG")
+    );
+    writeRecords(records3);
+
+    List<ThreeColumnRecord> records4 = Lists.newArrayList(
+        new ThreeColumnRecord(2, "BBBBBBBBBB", "FFFF"),
+        new ThreeColumnRecord(2, "BBBBBBBBBB", "HHHH")
+    );
+    writeRecords(records4);
+
+    table.refresh();
+
+    CloseableIterable<FileScanTask> tasks = table.newScan().planFiles();
+    List<DataFile> dataFiles = Lists.newArrayList(CloseableIterable.transform(tasks, FileScanTask::file));
+    Assert.assertEquals("Should have 8 data files before rewrite", 8, dataFiles.size());
+
+    Actions actions = Actions.forTable(table);
+
+    RewriteDataFilesActionResult result = actions
+        .rewriteDataFiles()
+        .filter(Expressions.equal("c1", 1))
+        .filter(Expressions.startsWith("c2", "AA"))
+        .execute();
+    Assert.assertEquals("Action should rewrite 2 data files", 2, result.deletedDataFiles().size());
+    Assert.assertEquals("Action should add 1 data file", 1, result.addedDataFiles().size());
+
+    table.refresh();
+
+    CloseableIterable<FileScanTask> tasks1 = table.newScan().planFiles();
+    List<DataFile> dataFiles1 = Lists.newArrayList(CloseableIterable.transform(tasks1, FileScanTask::file));
+    Assert.assertEquals("Should have 7 data files before rewrite", 7, dataFiles1.size());
+
+    List<ThreeColumnRecord> expectedRecords = Lists.newArrayList();
+    expectedRecords.addAll(records1);
+    expectedRecords.addAll(records2);
+    expectedRecords.addAll(records3);
+    expectedRecords.addAll(records4);
+
+    Dataset<Row> resultDF = spark.read().format("iceberg").load(tableLocation);
+    List<ThreeColumnRecord> actualRecords = resultDF.sort("c1", "c2", "c3")
+        .as(Encoders.bean(ThreeColumnRecord.class))
+        .collectAsList();
+
+    Assert.assertEquals("Rows must match", expectedRecords, actualRecords);
+  }
+
+  @Test
+  public void testRewriteLargeTableHasResiduals() {
+    PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).build();
+    Map<String, String> options = Maps.newHashMap();
+    options.put(TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES, "100");
+    Table table = TABLES.create(SCHEMA, spec, options, tableLocation);
+
+    // all records belong to the same partition
+    List<ThreeColumnRecord> records = Lists.newArrayList();
+    for (int i = 0; i < 100; i++) {
+      records.add(new ThreeColumnRecord(i, String.valueOf(i), String.valueOf(i % 4)));
+    }
+    Dataset<Row> df = spark.createDataFrame(records, ThreeColumnRecord.class);
+    writeDF(df);
+
+    table.refresh();
+
+    CloseableIterable<FileScanTask> tasks = table.newScan()
+        .ignoreResiduals()
+        .filter(Expressions.equal("c3", "0"))
+        .planFiles();
+    for (FileScanTask task : tasks) {
+      Assert.assertEquals("Residuals must be ignored", Expressions.alwaysTrue(), task.residual());
+    }
+    List<DataFile> dataFiles = Lists.newArrayList(CloseableIterable.transform(tasks, FileScanTask::file));
+    Assert.assertEquals("Should have 2 data files before rewrite", 2, dataFiles.size());
+
+    Actions actions = Actions.forTable(table);
+
+    RewriteDataFilesActionResult result = actions
+        .rewriteDataFiles()
+        .filter(Expressions.equal("c3", "0"))
+        .execute();
+    Assert.assertEquals("Action should rewrite 2 data files", 2, result.deletedDataFiles().size());
+    Assert.assertEquals("Action should add 1 data file", 1, result.addedDataFiles().size());
+
+    table.refresh();
+
+    Dataset<Row> resultDF = spark.read().format("iceberg").load(tableLocation);
+    List<ThreeColumnRecord> actualRecords = resultDF.sort("c1")
+        .as(Encoders.bean(ThreeColumnRecord.class))
+        .collectAsList();
+
+    Assert.assertEquals("Rows must match", records, actualRecords);
+  }
+
+  @Test
+  public void testRewriteDataFilesForLargeFile() throws AnalysisException {
+    PartitionSpec spec = PartitionSpec.unpartitioned();
+    Map<String, String> options = Maps.newHashMap();
+    Table table = TABLES.create(SCHEMA, spec, options, tableLocation);
+    Assert.assertNull("Table must be empty", table.currentSnapshot());
+
+    List<ThreeColumnRecord> records1 = Lists.newArrayList();
+
+    IntStream.range(0, 2000).forEach(i -> records1.add(new ThreeColumnRecord(i, "foo" + i, "bar" + i)));
+    Dataset<Row> df = spark.createDataFrame(records1, ThreeColumnRecord.class).repartition(1);
+    writeDF(df);
+
+    List<ThreeColumnRecord> records2 = Lists.newArrayList(
+        new ThreeColumnRecord(1, "BBBBBBBBBB", "BBBB"),
+        new ThreeColumnRecord(1, "DDDDDDDDDD", "DDDD")
+    );
+    writeRecords(records2);
+
+    table.refresh();
+
+    CloseableIterable<FileScanTask> tasks = table.newScan().planFiles();
+    List<DataFile> dataFiles = Lists.newArrayList(CloseableIterable.transform(tasks, FileScanTask::file));
+    DataFile maxSizeFile = Collections.max(dataFiles, Comparator.comparingLong(DataFile::fileSizeInBytes));
+    Assert.assertEquals("Should have 3 files before rewrite", 3, dataFiles.size());
+
+    spark.read().format("iceberg").load(tableLocation).createTempView("origin");
+    long originalNumRecords = spark.read().format("iceberg").load(tableLocation).count();
+    List<Object[]> originalRecords = sql("SELECT * from origin sort by c2");
+
+    Actions actions = Actions.forTable(table);
+
+    long targetSizeInBytes = maxSizeFile.fileSizeInBytes() - 10;
+    RewriteDataFilesActionResult result = actions
+        .rewriteDataFiles()
+        .targetSizeInBytes(targetSizeInBytes)
+        .splitOpenFileCost(1)
+        .execute();
+
+    Assert.assertEquals("Action should delete 3 data files", 3, result.deletedDataFiles().size());
+    Assert.assertEquals("Action should add 2 data files", 2, result.addedDataFiles().size());
+
+    spark.read().format("iceberg").load(tableLocation).createTempView("postRewrite");
+    long postRewriteNumRecords = spark.read().format("iceberg").load(tableLocation).count();
+    List<Object[]> rewrittenRecords = sql("SELECT * from postRewrite sort by c2");
+
+    Assert.assertEquals(originalNumRecords, postRewriteNumRecords);
+    assertEquals("Rows should be unchanged", originalRecords, rewrittenRecords);
+  }
+
+  private void writeRecords(List<ThreeColumnRecord> records) {
+    Dataset<Row> df = spark.createDataFrame(records, ThreeColumnRecord.class);
+    writeDF(df);
+  }
+
+  private void writeDF(Dataset<Row> df) {
+    df.select("c1", "c2", "c3")
+        .write()
+        .format("iceberg")
+        .mode("append")
+        .save(tableLocation);
+  }
+
+  @Test
+  public void testRewriteToOutputPartitionSpec() {
+    PartitionSpec spec = PartitionSpec.builderFor(SCHEMA)
+        .identity("c1")
+        .build();
+    Map<String, String> options = Maps.newHashMap();
+    Table table = TABLES.create(SCHEMA, spec, options, tableLocation);
+
+    table.updateSpec().addField(Expressions.truncate("c2", 2)).commit();
+
+    Assert.assertEquals("Should have 2 partitions specs", 2, table.specs().size());
+
+    List<ThreeColumnRecord> records1 = Lists.newArrayList(
+        new ThreeColumnRecord(1, "AAAAAAAAAA", "AAAA"),
+        new ThreeColumnRecord(1, "AAAAAAAAAA", "CCCC")
+    );
+    writeRecords(records1);
+
+    List<ThreeColumnRecord> records2 = Lists.newArrayList(
+        new ThreeColumnRecord(1, "BBBBBBBBBB", "BBBB"),
+        new ThreeColumnRecord(1, "BBBBBBBBBB", "DDDD")
+    );
+    writeRecords(records2);
+
+    List<ThreeColumnRecord> records3 = Lists.newArrayList(
+        new ThreeColumnRecord(2, "AAAAAAAAAA", "EEEE"),
+        new ThreeColumnRecord(2, "AAAAAAAAAA", "GGGG")
+    );
+    writeRecords(records3);
+
+    List<ThreeColumnRecord> records4 = Lists.newArrayList(
+        new ThreeColumnRecord(2, "BBBBBBBBBB", "FFFF"),
+        new ThreeColumnRecord(2, "BBBBBBBBBB", "HHHH")
+    );
+    writeRecords(records4);
+
+    table.refresh();
+
+    CloseableIterable<FileScanTask> tasks = table.newScan().planFiles();
+    List<DataFile> dataFiles = Lists.newArrayList(CloseableIterable.transform(tasks, FileScanTask::file));
+    Assert.assertEquals("Should have 8 data files before rewrite", 8, dataFiles.size());
+
+    Dataset<Row> beforeResultDF = spark.read().format("iceberg").load(tableLocation);
+    List<ThreeColumnRecord> beforeActualFilteredRecords = beforeResultDF.sort("c1", "c2", "c3")
+        .filter("c1 = 1 AND c2 = 'BBBBBBBBBB'")
+        .as(Encoders.bean(ThreeColumnRecord.class))
+        .collectAsList();
+    Assert.assertEquals("Rows must match", records2, beforeActualFilteredRecords);
+
+    Actions actions = Actions.forTable(table);
+    RewriteDataFilesActionResult result = actions
+        .rewriteDataFiles()
+        .outputSpecId(0)
+        .execute();
+    Assert.assertEquals("Action should rewrite 8 data files", 8, result.deletedDataFiles().size());
+    Assert.assertEquals("Action should add 2 data file", 2, result.addedDataFiles().size());
+
+    Assert.assertTrue(result.deletedDataFiles().stream().allMatch(df -> df.specId() == 1));
+    Assert.assertTrue(result.addedDataFiles().stream().allMatch(df -> df.specId() == 0));
+
+    table.refresh();
+
+    CloseableIterable<FileScanTask> tasks2 = table.newScan().planFiles();
+    List<DataFile> dataFiles2 = Lists.newArrayList(CloseableIterable.transform(tasks2, FileScanTask::file));
+    Assert.assertEquals("Should have 2 data files after rewrite", 2, dataFiles2.size());
+
+    // Should still have all the same data
+    List<ThreeColumnRecord> expectedRecords = Lists.newArrayList();
+    expectedRecords.addAll(records1);
+    expectedRecords.addAll(records2);
+    expectedRecords.addAll(records3);
+    expectedRecords.addAll(records4);
+
+    Dataset<Row> resultDF = spark.read().format("iceberg").load(tableLocation);
+    List<ThreeColumnRecord> actualRecords = resultDF.sort("c1", "c2", "c3")
+        .as(Encoders.bean(ThreeColumnRecord.class))
+        .collectAsList();
+
+    Assert.assertEquals("Rows must match", expectedRecords, actualRecords);
+
+    List<ThreeColumnRecord> actualFilteredRecords = resultDF.sort("c1", "c2", "c3")
+        .filter("c1 = 1 AND c2 = 'BBBBBBBBBB'")
+        .as(Encoders.bean(ThreeColumnRecord.class))
+        .collectAsList();
+    Assert.assertEquals("Rows must match", records2, actualFilteredRecords);
+
+    List<ThreeColumnRecord> records5 = Lists.newArrayList(
+        new ThreeColumnRecord(3, "CCCCCCCCCC", "FFFF"),
+        new ThreeColumnRecord(3, "CCCCCCCCCC", "HHHH")
+    );
+    writeRecords(records5);
+    expectedRecords.addAll(records5);
+    actualRecords = resultDF.sort("c1", "c2", "c3")
+        .as(Encoders.bean(ThreeColumnRecord.class))
+        .collectAsList();
+
+    Assert.assertEquals("Rows must match", expectedRecords, actualRecords);
+  }
+}
diff --git a/spark/v2.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java b/spark/v2.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
deleted file mode 100644
index 0b2d44c..0000000
--- a/spark/v2.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
+++ /dev/null
@@ -1,1005 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iceberg.spark.actions;
-
-import java.io.File;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-import java.util.Set;
-import java.util.stream.Collectors;
-import java.util.stream.IntStream;
-import java.util.stream.Stream;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.iceberg.AssertHelpers;
-import org.apache.iceberg.DataFile;
-import org.apache.iceberg.FileScanTask;
-import org.apache.iceberg.PartitionSpec;
-import org.apache.iceberg.Schema;
-import org.apache.iceberg.SortOrder;
-import org.apache.iceberg.StructLike;
-import org.apache.iceberg.Table;
-import org.apache.iceberg.TableProperties;
-import org.apache.iceberg.actions.ActionsProvider;
-import org.apache.iceberg.actions.BinPackStrategy;
-import org.apache.iceberg.actions.RewriteDataFiles;
-import org.apache.iceberg.actions.RewriteDataFiles.Result;
-import org.apache.iceberg.actions.RewriteDataFilesCommitManager;
-import org.apache.iceberg.actions.RewriteFileGroup;
-import org.apache.iceberg.actions.SortStrategy;
-import org.apache.iceberg.exceptions.CommitStateUnknownException;
-import org.apache.iceberg.expressions.Expressions;
-import org.apache.iceberg.hadoop.HadoopTables;
-import org.apache.iceberg.io.CloseableIterable;
-import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
-import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
-import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
-import org.apache.iceberg.relocated.com.google.common.collect.Lists;
-import org.apache.iceberg.relocated.com.google.common.collect.Maps;
-import org.apache.iceberg.relocated.com.google.common.collect.Streams;
-import org.apache.iceberg.spark.SparkTestBase;
-import org.apache.iceberg.spark.source.ThreeColumnRecord;
-import org.apache.iceberg.types.Comparators;
-import org.apache.iceberg.types.Conversions;
-import org.apache.iceberg.types.Types;
-import org.apache.iceberg.types.Types.NestedField;
-import org.apache.iceberg.util.Pair;
-import org.apache.spark.sql.Dataset;
-import org.apache.spark.sql.Row;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.mockito.ArgumentMatcher;
-import org.mockito.Mockito;
-
-import static org.apache.iceberg.types.Types.NestedField.optional;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.argThat;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.doCallRealMethod;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.doThrow;
-import static org.mockito.Mockito.spy;
-
-public abstract class TestRewriteDataFilesAction extends SparkTestBase {
-
-  protected abstract ActionsProvider actions();
-  protected abstract Set<String> cacheContents(Table table);
-
-  private static final HadoopTables TABLES = new HadoopTables(new Configuration());
-  private static final Schema SCHEMA = new Schema(
-      optional(1, "c1", Types.IntegerType.get()),
-      optional(2, "c2", Types.StringType.get()),
-      optional(3, "c3", Types.StringType.get())
-  );
-
-  @Rule
-  public TemporaryFolder temp = new TemporaryFolder();
-
-  private String tableLocation = null;
-
-  @Before
-  public void setupTableLocation() throws Exception {
-    File tableDir = temp.newFolder();
-    this.tableLocation = tableDir.toURI().toString();
-  }
-
-  private RewriteDataFiles basicRewrite(Table table) {
-    // Always compact regardless of input files
-    table.refresh();
-    return actions().rewriteDataFiles(table).option(BinPackStrategy.MIN_INPUT_FILES, "1");
-  }
-
-  @Test
-  public void testEmptyTable() {
-    PartitionSpec spec = PartitionSpec.unpartitioned();
-    Map<String, String> options = Maps.newHashMap();
-    Table table = TABLES.create(SCHEMA, spec, options, tableLocation);
-
-    Assert.assertNull("Table must be empty", table.currentSnapshot());
-
-    basicRewrite(table).execute();
-
-    Assert.assertNull("Table must stay empty", table.currentSnapshot());
-  }
-
-  @Test
-  public void testBinPackUnpartitionedTable() {
-    Table table = createTable(4);
-    shouldHaveFiles(table, 4);
-    List<Object[]> expectedRecords = currentData();
-
-    Result result = basicRewrite(table).execute();
-    Assert.assertEquals("Action should rewrite 4 data files", 4, result.rewrittenDataFilesCount());
-    Assert.assertEquals("Action should add 1 data file", 1, result.addedDataFilesCount());
-
-    shouldHaveFiles(table, 1);
-    List<Object[]> actual = currentData();
-
-    assertEquals("Rows must match", expectedRecords, actual);
-  }
-
-  @Test
-  public void testBinPackPartitionedTable() {
-    Table table = createTablePartitioned(4, 2);
-    shouldHaveFiles(table, 8);
-    List<Object[]> expectedRecords = currentData();
-
-    Result result = basicRewrite(table).execute();
-    Assert.assertEquals("Action should rewrite 8 data files", 8, result.rewrittenDataFilesCount());
-    Assert.assertEquals("Action should add 4 data file", 4, result.addedDataFilesCount());
-
-    shouldHaveFiles(table, 4);
-    List<Object[]> actualRecords = currentData();
-
-    assertEquals("Rows must match", expectedRecords, actualRecords);
-  }
-
-  @Test
-  public void testBinPackWithFilter() {
-    Table table = createTablePartitioned(4, 2);
-    shouldHaveFiles(table, 8);
-    List<Object[]> expectedRecords = currentData();
-
-    Result result = basicRewrite(table)
-        .filter(Expressions.equal("c1", 1))
-        .filter(Expressions.startsWith("c2", "foo"))
-        .execute();
-
-    Assert.assertEquals("Action should rewrite 2 data files", 2, result.rewrittenDataFilesCount());
-    Assert.assertEquals("Action should add 1 data file", 1, result.addedDataFilesCount());
-
-    shouldHaveFiles(table, 7);
-
-    List<Object[]> actualRecords = currentData();
-    assertEquals("Rows must match", expectedRecords, actualRecords);
-  }
-
-  @Test
-  public void testRewriteLargeTableHasResiduals() {
-    PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).build();
-    Map<String, String> options = Maps.newHashMap();
-    options.put(TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES, "100");
-    Table table = TABLES.create(SCHEMA, spec, options, tableLocation);
-
-    // all records belong to the same partition
-    List<ThreeColumnRecord> records = Lists.newArrayList();
-    for (int i = 0; i < 100; i++) {
-      records.add(new ThreeColumnRecord(i, String.valueOf(i), String.valueOf(i % 4)));
-    }
-    Dataset<Row> df = spark.createDataFrame(records, ThreeColumnRecord.class);
-    writeDF(df);
-
-    List<Object[]> expectedRecords = currentData();
-
-    table.refresh();
-
-    CloseableIterable<FileScanTask> tasks = table.newScan()
-        .ignoreResiduals()
-        .filter(Expressions.equal("c3", "0"))
-        .planFiles();
-    for (FileScanTask task : tasks) {
-      Assert.assertEquals("Residuals must be ignored", Expressions.alwaysTrue(), task.residual());
-    }
-
-    shouldHaveFiles(table, 2);
-
-    Result result = basicRewrite(table)
-        .filter(Expressions.equal("c3", "0"))
-        .execute();
-    Assert.assertEquals("Action should rewrite 2 data files", 2, result.rewrittenDataFilesCount());
-    Assert.assertEquals("Action should add 1 data file", 1, result.addedDataFilesCount());
-
-    List<Object[]> actualRecords = currentData();
-
-    assertEquals("Rows must match", expectedRecords, actualRecords);
-  }
-
-  @Test
-  public void testBinPackSplitLargeFile() {
-    Table table = createTable(1);
-    shouldHaveFiles(table, 1);
-
-    List<Object[]> expectedRecords = currentData();
-    long targetSize = testDataSize(table) / 2;
-
-    Result result = basicRewrite(table)
-        .option(RewriteDataFiles.TARGET_FILE_SIZE_BYTES, Long.toString(targetSize))
-        .execute();
-
-    Assert.assertEquals("Action should delete 1 data files", 1, result.rewrittenDataFilesCount());
-    Assert.assertEquals("Action should add 2 data files", 2, result.addedDataFilesCount());
-
-    shouldHaveFiles(table, 2);
-
-    List<Object[]> actualRecords = currentData();
-    assertEquals("Rows must match", expectedRecords, actualRecords);
-  }
-
-  @Test
-  public void testBinPackCombineMixedFiles() {
-    // One file too big
-    Table table = createTable(1);
-    shouldHaveFiles(table, 1);
-
-    // Two files too small
-    writeRecords(1, 100);
-    writeRecords(1, 100);
-    shouldHaveFiles(table, 3);
-
-    List<Object[]> expectedRecords = currentData();
-
-    int targetSize = averageFileSize(table);
-
-    Result result = basicRewrite(table)
-        .option(RewriteDataFiles.TARGET_FILE_SIZE_BYTES, Integer.toString(targetSize))
-        .option(BinPackStrategy.MAX_FILE_SIZE_BYTES, Integer.toString(targetSize + 1000))
-        .option(BinPackStrategy.MIN_FILE_SIZE_BYTES, Integer.toString(targetSize - 1000))
-        .execute();
-
-    Assert.assertEquals("Action should delete 3 data files", 3, result.rewrittenDataFilesCount());
-    // Should Split the big files into 3 pieces, one of which should be combined with the two smaller files
-    Assert.assertEquals("Action should add 3 data files", 3, result.addedDataFilesCount());
-
-    shouldHaveFiles(table, 3);
-
-    List<Object[]> actualRecords = currentData();
-    assertEquals("Rows must match", expectedRecords, actualRecords);
-  }
-
-  @Test
-  public void testPartialProgressEnabled() {
-    Table table = createTable(20);
-    int fileSize = averageFileSize(table);
-
-    List<Object[]> originalData = currentData();
-
-    // Perform a rewrite but only allow 2 files to be compacted at a time
-    RewriteDataFiles.Result result =
-        basicRewrite(table)
-            .option(RewriteDataFiles.PARTIAL_PROGRESS_ENABLED, "true")
-            .option(RewriteDataFiles.MAX_FILE_GROUP_SIZE_BYTES, Integer.toString(fileSize * 2 + 1000))
-            .option(RewriteDataFiles.PARTIAL_PROGRESS_MAX_COMMITS, "10")
-            .execute();
-
-    Assert.assertEquals("Should have 10 fileGroups", result.rewriteResults().size(), 10);
-
-    table.refresh();
-
-    shouldHaveSnapshots(table, 11);
-    shouldHaveACleanCache(table);
-
-    List<Object[]> postRewriteData = currentData();
-    assertEquals("We shouldn't have changed the data", originalData, postRewriteData);
-  }
-
-  @Test
-  public void testMultipleGroups() {
-    Table table = createTable(20);
-    int fileSize = averageFileSize(table);
-
-    List<Object[]> originalData = currentData();
-
-    // Perform a rewrite but only allow 2 files to be compacted at a time
-    RewriteDataFiles.Result result =
-        basicRewrite(table)
-            .option(RewriteDataFiles.MAX_FILE_GROUP_SIZE_BYTES, Integer.toString(fileSize * 2 + 1000))
-            .option(BinPackStrategy.MIN_INPUT_FILES, "1")
-            .execute();
-
-    Assert.assertEquals("Should have 10 fileGroups", result.rewriteResults().size(), 10);
-
-    table.refresh();
-
-    List<Object[]> postRewriteData = currentData();
-    assertEquals("We shouldn't have changed the data", originalData, postRewriteData);
-
-    shouldHaveSnapshots(table, 2);
-    shouldHaveACleanCache(table);
-  }
-
-  @Test
-  public void testPartialProgressMaxCommits() {
-    Table table = createTable(20);
-    int fileSize = averageFileSize(table);
-
-    List<Object[]> originalData = currentData();
-
-    // Perform a rewrite but only allow 2 files to be compacted at a time
-    RewriteDataFiles.Result result =
-        basicRewrite(table)
-            .option(RewriteDataFiles.MAX_FILE_GROUP_SIZE_BYTES, Integer.toString(fileSize * 2 + 1000))
-            .option(RewriteDataFiles.PARTIAL_PROGRESS_ENABLED, "true")
-            .option(RewriteDataFiles.PARTIAL_PROGRESS_MAX_COMMITS, "3")
-            .execute();
-
-    Assert.assertEquals("Should have 10 fileGroups", result.rewriteResults().size(), 10);
-
-    table.refresh();
-
-    List<Object[]> postRewriteData = currentData();
-    assertEquals("We shouldn't have changed the data", originalData, postRewriteData);
-
-    shouldHaveSnapshots(table, 4);
-    shouldHaveACleanCache(table);
-  }
-
-  @Test
-  public void testSingleCommitWithRewriteFailure() {
-    Table table = createTable(20);
-    int fileSize = averageFileSize(table);
-
-    List<Object[]> originalData = currentData();
-
-    BaseRewriteDataFilesSparkAction realRewrite =
-        (org.apache.iceberg.spark.actions.BaseRewriteDataFilesSparkAction)
-            basicRewrite(table)
-                .option(RewriteDataFiles.MAX_FILE_GROUP_SIZE_BYTES, Integer.toString(fileSize * 2 + 1000));
-
-    BaseRewriteDataFilesSparkAction spyRewrite = Mockito.spy(realRewrite);
-
-    // Fail groups 1, 3, and 7 during rewrite
-    GroupInfoMatcher failGroup = new GroupInfoMatcher(1, 3, 7);
-    doThrow(new RuntimeException("Rewrite Failed"))
-        .when(spyRewrite)
-        .rewriteFiles(any(), argThat(failGroup));
-
-    AssertHelpers.assertThrows("Should fail entire rewrite if part fails", RuntimeException.class,
-        () -> spyRewrite.execute());
-
-    table.refresh();
-
-    List<Object[]> postRewriteData = currentData();
-    assertEquals("We shouldn't have changed the data", originalData, postRewriteData);
-
-    shouldHaveSnapshots(table, 1);
-    shouldHaveNoOrphans(table);
-    shouldHaveACleanCache(table);
-  }
-
-  @Test
-  public void testSingleCommitWithCommitFailure() {
-    Table table = createTable(20);
-    int fileSize = averageFileSize(table);
-
-    List<Object[]> originalData = currentData();
-
-    BaseRewriteDataFilesSparkAction realRewrite =
-        (org.apache.iceberg.spark.actions.BaseRewriteDataFilesSparkAction)
-            basicRewrite(table)
-                .option(RewriteDataFiles.MAX_FILE_GROUP_SIZE_BYTES, Integer.toString(fileSize * 2 + 1000));
-
-    BaseRewriteDataFilesSparkAction spyRewrite = spy(realRewrite);
-    RewriteDataFilesCommitManager util = spy(new RewriteDataFilesCommitManager(table));
-
-    // Fail to commit
-    doThrow(new RuntimeException("Commit Failure"))
-        .when(util)
-        .commitFileGroups(any());
-
-    doReturn(util)
-        .when(spyRewrite)
-        .commitManager(table.currentSnapshot().snapshotId());
-
-    AssertHelpers.assertThrows("Should fail entire rewrite if commit fails", RuntimeException.class,
-        () -> spyRewrite.execute());
-
-    table.refresh();
-
-    List<Object[]> postRewriteData = currentData();
-    assertEquals("We shouldn't have changed the data", originalData, postRewriteData);
-
-    shouldHaveSnapshots(table, 1);
-    shouldHaveNoOrphans(table);
-    shouldHaveACleanCache(table);
-  }
-
-  @Test
-  public void testParallelSingleCommitWithRewriteFailure() {
-    Table table = createTable(20);
-    int fileSize = averageFileSize(table);
-
-    List<Object[]> originalData = currentData();
-
-    BaseRewriteDataFilesSparkAction realRewrite =
-        (org.apache.iceberg.spark.actions.BaseRewriteDataFilesSparkAction)
-            basicRewrite(table)
-                .option(RewriteDataFiles.MAX_FILE_GROUP_SIZE_BYTES, Integer.toString(fileSize * 2 + 1000))
-                .option(RewriteDataFiles.MAX_CONCURRENT_FILE_GROUP_REWRITES, "3");
-
-    BaseRewriteDataFilesSparkAction spyRewrite = Mockito.spy(realRewrite);
-
-    // Fail groups 1, 3, and 7 during rewrite
-    GroupInfoMatcher failGroup = new GroupInfoMatcher(1, 3, 7);
-    doThrow(new RuntimeException("Rewrite Failed"))
-        .when(spyRewrite)
-        .rewriteFiles(any(), argThat(failGroup));
-
-    AssertHelpers.assertThrows("Should fail entire rewrite if part fails", RuntimeException.class,
-        () -> spyRewrite.execute());
-
-    table.refresh();
-
-    List<Object[]> postRewriteData = currentData();
-    assertEquals("We shouldn't have changed the data", originalData, postRewriteData);
-
-    shouldHaveSnapshots(table, 1);
-    shouldHaveNoOrphans(table);
-    shouldHaveACleanCache(table);
-  }
-
-  @Test
-  public void testPartialProgressWithRewriteFailure() {
-    Table table = createTable(20);
-    int fileSize = averageFileSize(table);
-
-    List<Object[]> originalData = currentData();
-
-    BaseRewriteDataFilesSparkAction realRewrite =
-        (org.apache.iceberg.spark.actions.BaseRewriteDataFilesSparkAction)
-            basicRewrite(table)
-                .option(RewriteDataFiles.MAX_FILE_GROUP_SIZE_BYTES, Integer.toString(fileSize * 2 + 1000))
-                .option(RewriteDataFiles.PARTIAL_PROGRESS_ENABLED, "true")
-                .option(RewriteDataFiles.PARTIAL_PROGRESS_MAX_COMMITS, "3");
-
-    BaseRewriteDataFilesSparkAction spyRewrite = Mockito.spy(realRewrite);
-
-    // Fail groups 1, 3, and 7 during rewrite
-    GroupInfoMatcher failGroup = new GroupInfoMatcher(1, 3, 7);
-    doThrow(new RuntimeException("Rewrite Failed"))
-        .when(spyRewrite)
-        .rewriteFiles(any(), argThat(failGroup));
-
-    RewriteDataFiles.Result result = spyRewrite.execute();
-
-    Assert.assertEquals("Should have 7 fileGroups", result.rewriteResults().size(), 7);
-
-    table.refresh();
-
-    List<Object[]> postRewriteData = currentData();
-    assertEquals("We shouldn't have changed the data", originalData, postRewriteData);
-
-    // With 10 original groups and Max Commits of 3, we should have commits with 4, 4, and 2.
-    // removing 3 groups leaves us with only 2 new commits, 4 and 3
-    shouldHaveSnapshots(table, 3);
-    shouldHaveNoOrphans(table);
-    shouldHaveACleanCache(table);
-  }
-
-  @Test
-  public void testParallelPartialProgressWithRewriteFailure() {
-    Table table = createTable(20);
-    int fileSize = averageFileSize(table);
-
-    List<Object[]> originalData = currentData();
-
-    BaseRewriteDataFilesSparkAction realRewrite =
-        (org.apache.iceberg.spark.actions.BaseRewriteDataFilesSparkAction)
-            basicRewrite(table)
-                .option(RewriteDataFiles.MAX_FILE_GROUP_SIZE_BYTES, Integer.toString(fileSize * 2 + 1000))
-                .option(RewriteDataFiles.MAX_CONCURRENT_FILE_GROUP_REWRITES, "3")
-                .option(RewriteDataFiles.PARTIAL_PROGRESS_ENABLED, "true")
-                .option(RewriteDataFiles.PARTIAL_PROGRESS_MAX_COMMITS, "3");
-
-    BaseRewriteDataFilesSparkAction spyRewrite = Mockito.spy(realRewrite);
-
-    // Fail groups 1, 3, and 7 during rewrite
-    GroupInfoMatcher failGroup = new GroupInfoMatcher(1, 3, 7);
-    doThrow(new RuntimeException("Rewrite Failed"))
-        .when(spyRewrite)
-        .rewriteFiles(any(), argThat(failGroup));
-
-    RewriteDataFiles.Result result = spyRewrite.execute();
-
-    Assert.assertEquals("Should have 7 fileGroups", result.rewriteResults().size(), 7);
-
-    table.refresh();
-
-    List<Object[]> postRewriteData = currentData();
-    assertEquals("We shouldn't have changed the data", originalData, postRewriteData);
-
-    // With 10 original groups and Max Commits of 3, we should have commits with 4, 4, and 2.
-    // removing 3 groups leaves us with only 2 new commits, 4 and 3
-    shouldHaveSnapshots(table, 3);
-    shouldHaveNoOrphans(table);
-    shouldHaveACleanCache(table);
-  }
-
-  @Test
-  public void testParallelPartialProgressWithCommitFailure() {
-    Table table = createTable(20);
-    int fileSize = averageFileSize(table);
-
-    List<Object[]> originalData = currentData();
-
-    BaseRewriteDataFilesSparkAction realRewrite =
-        (org.apache.iceberg.spark.actions.BaseRewriteDataFilesSparkAction)
-            basicRewrite(table)
-                .option(RewriteDataFiles.MAX_FILE_GROUP_SIZE_BYTES, Integer.toString(fileSize * 2 + 1000))
-                .option(RewriteDataFiles.MAX_CONCURRENT_FILE_GROUP_REWRITES, "3")
-                .option(RewriteDataFiles.PARTIAL_PROGRESS_ENABLED, "true")
-                .option(RewriteDataFiles.PARTIAL_PROGRESS_MAX_COMMITS, "3");
-
-    BaseRewriteDataFilesSparkAction spyRewrite = spy(realRewrite);
-    RewriteDataFilesCommitManager util = spy(new RewriteDataFilesCommitManager(table));
-
-    // First and Third commits work, second does not
-    doCallRealMethod()
-        .doThrow(new RuntimeException("Commit Failed"))
-        .doCallRealMethod()
-        .when(util)
-        .commitFileGroups(any());
-
-    doReturn(util)
-        .when(spyRewrite)
-        .commitManager(table.currentSnapshot().snapshotId());
-
-    RewriteDataFiles.Result result = spyRewrite.execute();
-
-    // Commit 1: 4/4 + Commit 2 failed 0/4 + Commit 3: 2/2 == 6 out of 10 total groups comitted
-    Assert.assertEquals("Should have 6 fileGroups", 6, result.rewriteResults().size());
-
-    table.refresh();
-
-    List<Object[]> postRewriteData = currentData();
-    assertEquals("We shouldn't have changed the data", originalData, postRewriteData);
-
-    // Only 2 new commits because we broke one
-    shouldHaveSnapshots(table, 3);
-    shouldHaveNoOrphans(table);
-    shouldHaveACleanCache(table);
-  }
-
-  @Test
-  public void testInvalidOptions() {
-    Table table = createTable(20);
-
-    AssertHelpers.assertThrows("No negative values for partial progress max commits",
-        IllegalArgumentException.class,
-        () -> basicRewrite(table)
-            .option(RewriteDataFiles.PARTIAL_PROGRESS_ENABLED, "true")
-            .option(RewriteDataFiles.PARTIAL_PROGRESS_MAX_COMMITS, "-5")
-            .execute());
-
-    AssertHelpers.assertThrows("No negative values for max concurrent groups",
-        IllegalArgumentException.class,
-        () -> basicRewrite(table)
-            .option(RewriteDataFiles.MAX_CONCURRENT_FILE_GROUP_REWRITES, "-5")
-            .execute());
-
-    AssertHelpers.assertThrows("No unknown options allowed",
-        IllegalArgumentException.class,
-        () -> basicRewrite(table)
-            .option("foobarity", "-5")
-            .execute());
-  }
-
-  @Test
-  public void testSortMultipleGroups() {
-    Table table = createTable(20);
-    shouldHaveFiles(table, 20);
-    table.replaceSortOrder().asc("c2").commit();
-    shouldHaveLastCommitUnsorted(table, "c2");
-    int fileSize = averageFileSize(table);
-
-    List<Object[]> originalData = currentData();
-
-    // Perform a rewrite but only allow 2 files to be compacted at a time
-    RewriteDataFiles.Result result =
-        basicRewrite(table)
-            .sort()
-            .option(SortStrategy.REWRITE_ALL, "true")
-            .option(RewriteDataFiles.MAX_FILE_GROUP_SIZE_BYTES, Integer.toString(fileSize * 2 + 1000))
-            .execute();
-
-    Assert.assertEquals("Should have 10 fileGroups", result.rewriteResults().size(), 10);
-
-    table.refresh();
-
-    List<Object[]> postRewriteData = currentData();
-    assertEquals("We shouldn't have changed the data", originalData, postRewriteData);
-
-    shouldHaveSnapshots(table, 2);
-    shouldHaveACleanCache(table);
-  }
-
-  @Test
-  public void testSimpleSort() {
-    Table table = createTable(20);
-    shouldHaveFiles(table, 20);
-    table.replaceSortOrder().asc("c2").commit();
-    shouldHaveLastCommitUnsorted(table, "c2");
-
-    List<Object[]> originalData = currentData();
-
-    RewriteDataFiles.Result result =
-        basicRewrite(table)
-            .sort()
-            .option(SortStrategy.MIN_INPUT_FILES, "1")
-            .option(SortStrategy.REWRITE_ALL, "true")
-            .option(RewriteDataFiles.TARGET_FILE_SIZE_BYTES, Integer.toString(averageFileSize(table)))
-            .execute();
-
-    Assert.assertEquals("Should have 1 fileGroups", result.rewriteResults().size(), 1);
-
-    table.refresh();
-
-    List<Object[]> postRewriteData = currentData();
-    assertEquals("We shouldn't have changed the data", originalData, postRewriteData);
-
-    shouldHaveSnapshots(table, 2);
-    shouldHaveACleanCache(table);
-    shouldHaveMultipleFiles(table);
-    shouldHaveLastCommitSorted(table, "c2");
-  }
-
-  @Test
-  public void testSortAfterPartitionChange() {
-    Table table = createTable(20);
-    shouldHaveFiles(table, 20);
-    table.updateSpec().addField(Expressions.bucket("c1", 4)).commit();
-    table.replaceSortOrder().asc("c2").commit();
-    shouldHaveLastCommitUnsorted(table, "c2");
-
-    List<Object[]> originalData = currentData();
-
-    RewriteDataFiles.Result result =
-        basicRewrite(table)
-            .sort()
-            .option(SortStrategy.MIN_INPUT_FILES, "1")
-            .option(SortStrategy.REWRITE_ALL, "true")
-            .option(RewriteDataFiles.TARGET_FILE_SIZE_BYTES, Integer.toString(averageFileSize(table)))
-            .execute();
-
-    Assert.assertEquals("Should have 1 fileGroup because all files were not correctly partitioned",
-        result.rewriteResults().size(), 1);
-
-    table.refresh();
-
-    List<Object[]> postRewriteData = currentData();
-    assertEquals("We shouldn't have changed the data", originalData, postRewriteData);
-
-    shouldHaveSnapshots(table, 2);
-    shouldHaveACleanCache(table);
-    shouldHaveMultipleFiles(table);
-    shouldHaveLastCommitSorted(table, "c2");
-  }
-
-  @Test
-  public void testSortCustomSortOrder() {
-    Table table = createTable(20);
-    shouldHaveLastCommitUnsorted(table, "c2");
-    shouldHaveFiles(table, 20);
-
-    List<Object[]> originalData = currentData();
-
-    RewriteDataFiles.Result result =
-        basicRewrite(table)
-            .sort(SortOrder.builderFor(table.schema()).asc("c2").build())
-            .option(SortStrategy.REWRITE_ALL, "true")
-            .option(RewriteDataFiles.TARGET_FILE_SIZE_BYTES, Integer.toString(averageFileSize(table)))
-            .execute();
-
-    Assert.assertEquals("Should have 1 fileGroups", result.rewriteResults().size(), 1);
-
-    table.refresh();
-
-    List<Object[]> postRewriteData = currentData();
-    assertEquals("We shouldn't have changed the data", originalData, postRewriteData);
-
-    shouldHaveSnapshots(table, 2);
-    shouldHaveACleanCache(table);
-    shouldHaveMultipleFiles(table);
-    shouldHaveLastCommitSorted(table, "c2");
-  }
-
-  @Test
-  public void testSortCustomSortOrderRequiresRepartition() {
-    Table table = createTable(20);
-    shouldHaveLastCommitUnsorted(table, "c3");
-
-    // Add a partition column so this requires repartitioning
-    table.updateSpec().addField("c1").commit();
-    // Add a sort order which our repartitioning needs to ignore
-    table.replaceSortOrder().asc("c2").apply();
-    shouldHaveFiles(table, 20);
-
-    List<Object[]> originalData = currentData();
-
-    RewriteDataFiles.Result result =
-        basicRewrite(table)
-            .sort(SortOrder.builderFor(table.schema()).asc("c3").build())
-            .option(SortStrategy.REWRITE_ALL, "true")
-            .option(RewriteDataFiles.TARGET_FILE_SIZE_BYTES, Integer.toString(averageFileSize(table)))
-            .execute();
-
-    Assert.assertEquals("Should have 1 fileGroups", result.rewriteResults().size(), 1);
-
-    table.refresh();
-
-    List<Object[]> postRewriteData = currentData();
-    assertEquals("We shouldn't have changed the data", originalData, postRewriteData);
-
-    shouldHaveSnapshots(table, 2);
-    shouldHaveACleanCache(table);
-    shouldHaveMultipleFiles(table);
-    shouldHaveLastCommitUnsorted(table, "c2");
-    shouldHaveLastCommitSorted(table, "c3");
-  }
-
-  @Test
-  public void testAutoSortShuffleOutput() {
-    Table table = createTable(20);
-    shouldHaveLastCommitUnsorted(table, "c2");
-    shouldHaveFiles(table, 20);
-
-    List<Object[]> originalData = currentData();
-
-    RewriteDataFiles.Result result =
-        basicRewrite(table)
-            .sort(SortOrder.builderFor(table.schema()).asc("c2").build())
-            .option(SortStrategy.MAX_FILE_SIZE_BYTES, Integer.toString((averageFileSize(table) / 2) + 2))
-            // Divide files in 2
-            .option(RewriteDataFiles.TARGET_FILE_SIZE_BYTES, Integer.toString(averageFileSize(table) / 2))
-            .option(SortStrategy.MIN_INPUT_FILES, "1")
-            .execute();
-
-    Assert.assertEquals("Should have 1 fileGroups", result.rewriteResults().size(), 1);
-    Assert.assertTrue("Should have written 40+ files", Iterables.size(table.currentSnapshot().addedFiles()) >= 40);
-
-    table.refresh();
-
-    List<Object[]> postRewriteData = currentData();
-    assertEquals("We shouldn't have changed the data", originalData, postRewriteData);
-
-    shouldHaveSnapshots(table, 2);
-    shouldHaveACleanCache(table);
-    shouldHaveMultipleFiles(table);
-    shouldHaveLastCommitSorted(table, "c2");
-  }
-
-  @Test
-  public void testCommitStateUnknownException() {
-    Table table = createTable(20);
-    shouldHaveFiles(table, 20);
-
-    List<Object[]> originalData = currentData();
-
-    BaseRewriteDataFilesSparkAction action = (BaseRewriteDataFilesSparkAction) basicRewrite(table);
-    BaseRewriteDataFilesSparkAction spyAction = spy(action);
-    RewriteDataFilesCommitManager util = spy(new RewriteDataFilesCommitManager(table));
-
-    doAnswer(invocationOnMock -> {
-      invocationOnMock.callRealMethod();
-      throw new CommitStateUnknownException(new RuntimeException("Unknown State"));
-    }).when(util).commitFileGroups(any());
-
-    doReturn(util)
-        .when(spyAction)
-        .commitManager(table.currentSnapshot().snapshotId());
-
-    AssertHelpers.assertThrows("Should propagate CommitStateUnknown Exception",
-        CommitStateUnknownException.class, () -> spyAction.execute());
-
-    List<Object[]> postRewriteData = currentData();
-    assertEquals("We shouldn't have changed the data", originalData, postRewriteData);
-
-    shouldHaveSnapshots(table, 2); // Commit actually Succeeded
-  }
-
-  @Test
-  public void testInvalidAPIUsage() {
-    Table table = createTable(1);
-
-    AssertHelpers.assertThrows("Should be unable to set Strategy more than once", IllegalArgumentException.class,
-        "Cannot set strategy", () -> actions().rewriteDataFiles(table).binPack().sort());
-
-    AssertHelpers.assertThrows("Should be unable to set Strategy more than once", IllegalArgumentException.class,
-        "Cannot set strategy", () -> actions().rewriteDataFiles(table).sort().binPack());
-
-    AssertHelpers.assertThrows("Should be unable to set Strategy more than once", IllegalArgumentException.class,
-        "Cannot set strategy", () -> actions().rewriteDataFiles(table).sort(SortOrder.unsorted()).binPack());
-  }
-
-  protected List<Object[]> currentData() {
-    return rowsToJava(spark.read().format("iceberg").load(tableLocation)
-        .sort("c1", "c2", "c3")
-        .collectAsList());
-  }
-
-  protected long testDataSize(Table table) {
-    return Streams.stream(table.newScan().planFiles()).mapToLong(FileScanTask::length).sum();
-  }
-
-  protected void shouldHaveMultipleFiles(Table table) {
-    table.refresh();
-    int numFiles = Iterables.size(table.newScan().planFiles());
-    Assert.assertTrue(String.format("Should have multiple files, had %d", numFiles), numFiles > 1);
-  }
-
-  protected void shouldHaveFiles(Table table, int numExpected) {
-    table.refresh();
-    int numFiles = Iterables.size(table.newScan().planFiles());
-    Assert.assertEquals("Did not have the expected number of files", numExpected, numFiles);
-  }
-
-  protected void shouldHaveSnapshots(Table table, int expectedSnapshots) {
-    table.refresh();
-    int actualSnapshots = Iterables.size(table.snapshots());
-    Assert.assertEquals("Table did not have the expected number of snapshots",
-        expectedSnapshots, actualSnapshots);
-  }
-
-  protected void shouldHaveNoOrphans(Table table) {
-    Assert.assertEquals("Should not have found any orphan files", ImmutableList.of(),
-        actions().deleteOrphanFiles(table)
-            .olderThan(System.currentTimeMillis())
-            .execute()
-            .orphanFileLocations());
-  }
-
-  protected void shouldHaveACleanCache(Table table) {
-    Assert.assertEquals("Should not have any entries in cache", ImmutableSet.of(),
-        cacheContents(table));
-  }
-
-  protected <T> void shouldHaveLastCommitSorted(Table table, String column) {
-    List<Pair<Pair<T, T>, Pair<T, T>>>
-        overlappingFiles = getOverlappingFiles(table, column);
-
-    Assert.assertEquals("Found overlapping files", Collections.emptyList(), overlappingFiles);
-  }
-
-  protected <T> void shouldHaveLastCommitUnsorted(Table table, String column) {
-    List<Pair<Pair<T, T>, Pair<T, T>>>
-        overlappingFiles = getOverlappingFiles(table, column);
-
-    Assert.assertNotEquals("Found no overlapping files", Collections.emptyList(), overlappingFiles);
-  }
-
-  private <T> List<Pair<Pair<T, T>, Pair<T, T>>> getOverlappingFiles(Table table, String column) {
-    table.refresh();
-    NestedField field = table.schema().caseInsensitiveFindField(column);
-    int columnId = field.fieldId();
-    Class<T> javaClass = (Class<T>) field.type().typeId().javaClass();
-    Map<StructLike, List<DataFile>> filesByPartition = Streams.stream(table.currentSnapshot().addedFiles())
-        .collect(Collectors.groupingBy(DataFile::partition));
-
-    Stream<Pair<Pair<T, T>, Pair<T, T>>> overlaps =
-        filesByPartition.entrySet().stream().flatMap(entry -> {
-          List<Pair<T, T>> columnBounds =
-              entry.getValue().stream()
-                  .map(file -> Pair.of(
-                      javaClass.cast(Conversions.fromByteBuffer(field.type(), file.lowerBounds().get(columnId))),
-                      javaClass.cast(Conversions.fromByteBuffer(field.type(), file.upperBounds().get(columnId)))))
-                  .collect(Collectors.toList());
-
-          Comparator<T> comparator = Comparators.forType(field.type().asPrimitiveType());
-
-          List<Pair<Pair<T, T>, Pair<T, T>>> overlappingFiles = columnBounds.stream()
-              .flatMap(left -> columnBounds.stream().map(right -> Pair.of(left, right)))
-              .filter(filePair -> {
-                Pair<T, T> left = filePair.first();
-                T lLower = left.first();
-                T lUpper = left.second();
-                Pair<T, T> right = filePair.second();
-                T rLower = right.first();
-                T rUpper = right.second();
-                boolean boundsOverlap =
-                    (comparator.compare(lUpper, rLower) >= 0 && comparator.compare(lUpper, rUpper) <= 0) ||
-                        (comparator.compare(lLower, rLower) >= 0 && comparator.compare(lLower, rUpper) <= 0);
-
-                return (left != right) && boundsOverlap;
-              })
-              .collect(Collectors.toList());
-          return overlappingFiles.stream();
-        });
-
-    return overlaps.collect(Collectors.toList());
-  }
-
-  /**
-   * Create a table with a certain number of files, returns the size of a file
-   * @param files number of files to create
-   * @return the created table
-   */
-  protected Table createTable(int files) {
-    PartitionSpec spec = PartitionSpec.unpartitioned();
-    Map<String, String> options = Maps.newHashMap();
-    Table table = TABLES.create(SCHEMA, spec, options, tableLocation);
-    table.updateProperties().set(TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES, "1024").commit();
-    Assert.assertNull("Table must be empty", table.currentSnapshot());
-
-    writeRecords(files, 40000);
-
-    return table;
-  }
-
-  protected Table createTablePartitioned(int partitions, int files) {
-    PartitionSpec spec = PartitionSpec.builderFor(SCHEMA)
-        .identity("c1")
-        .truncate("c2", 2)
-        .build();
-    Map<String, String> options = Maps.newHashMap();
-    Table table = TABLES.create(SCHEMA, spec, options, tableLocation);
-    Assert.assertNull("Table must be empty", table.currentSnapshot());
-
-    writeRecords(files, 2000, partitions);
-    return table;
-  }
-
-  protected int averageFileSize(Table table) {
-    table.refresh();
-    return (int) Streams.stream(table.newScan().planFiles()).mapToLong(FileScanTask::length).average().getAsDouble();
-  }
-
-  private void writeRecords(int files, int numRecords) {
-    writeRecords(files, numRecords, 0);
-  }
-
-  private void writeRecords(int files, int numRecords, int partitions) {
-    List<ThreeColumnRecord> records = Lists.newArrayList();
-    int rowDimension = (int) Math.ceil(Math.sqrt(numRecords));
-    List<Pair<Integer, Integer>> data =
-        IntStream.range(0, rowDimension).boxed().flatMap(x ->
-                IntStream.range(0, rowDimension).boxed().map(y -> Pair.of(x, y)))
-            .collect(Collectors.toList());
-    Collections.shuffle(data, new Random(42));
-    if (partitions > 0) {
-      data.forEach(i -> records.add(new ThreeColumnRecord(
-          i.first() % partitions,
-          "foo" + i.first(),
-          "bar" + i.second())));
-    } else {
-      data.forEach(i -> records.add(new ThreeColumnRecord(
-          i.first(),
-          "foo" + i.first(),
-          "bar" + i.second())));
-    }
-    Dataset<Row> df = spark.createDataFrame(records, ThreeColumnRecord.class).repartition(files);
-    writeDF(df);
-  }
-
-  private void writeDF(Dataset<Row> df) {
-    df.select("c1", "c2", "c3")
-        .sortWithinPartitions("c1", "c2")
-        .write()
-        .format("iceberg")
-        .mode("append")
-        .save(tableLocation);
-  }
-
-  class GroupInfoMatcher implements ArgumentMatcher<RewriteFileGroup> {
-    private final Set<Integer> groupIDs;
-
-    GroupInfoMatcher(Integer... globalIndex) {
-      this.groupIDs = ImmutableSet.copyOf(globalIndex);
-    }
-
-    @Override
-    public boolean matches(RewriteFileGroup argument) {
-      return groupIDs.contains(argument.info().globalIndex());
-    }
-  }
-}