You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by op...@apache.org on 2020/10/28 04:19:39 UTC

[iceberg] branch master updated: Core: Extract the BaseRewriteDataFilesAction for implementing both flink and spark rewrite actions.

This is an automated email from the ASF dual-hosted git repository.

openinx pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new da9a0b2  Core: Extract the BaseRewriteDataFilesAction for implementing both flink and spark rewrite actions.
da9a0b2 is described below

commit da9a0b24f21a4390a5270d94c576b62f3b49caec
Author: JunZhang <zh...@126.com>
AuthorDate: Wed Oct 28 12:19:30 2020 +0800

    Core: Extract the BaseRewriteDataFilesAction for implementing both flink and spark rewrite actions.
---
 .../java/org/apache/iceberg/actions/Action.java    |   0
 .../org/apache/iceberg/actions/BaseAction.java     |  89 ++++++++
 .../actions/BaseRewriteDataFilesAction.java        | 103 +++++----
 .../iceberg/actions/BaseSnapshotUpdateAction.java  |   0
 .../actions/RewriteDataFilesActionResult.java      |   0
 .../iceberg/actions/SnapshotUpdateAction.java      |   0
 .../{BaseAction.java => BaseSparkAction.java}      |  60 +-----
 .../iceberg/actions/ExpireSnapshotsAction.java     |   2 +-
 .../iceberg/actions/RemoveOrphanFilesAction.java   |   2 +-
 .../iceberg/actions/RewriteDataFilesAction.java    | 237 +--------------------
 10 files changed, 150 insertions(+), 343 deletions(-)

diff --git a/spark/src/main/java/org/apache/iceberg/actions/Action.java b/core/src/main/java/org/apache/iceberg/actions/Action.java
similarity index 100%
rename from spark/src/main/java/org/apache/iceberg/actions/Action.java
rename to core/src/main/java/org/apache/iceberg/actions/Action.java
diff --git a/core/src/main/java/org/apache/iceberg/actions/BaseAction.java b/core/src/main/java/org/apache/iceberg/actions/BaseAction.java
new file mode 100644
index 0000000..cc6c5fe
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/actions/BaseAction.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.actions;
+
+import java.util.List;
+import org.apache.iceberg.MetadataTableType;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+
+abstract class BaseAction<R> implements Action<R> {
+
+  protected abstract Table table();
+
+  protected String metadataTableName(MetadataTableType type) {
+    return metadataTableName(table().name(), type);
+  }
+
+  protected String metadataTableName(String tableName, MetadataTableType type) {
+    if (tableName.contains("/")) {
+      return tableName + "#" + type;
+    } else if (tableName.startsWith("hadoop.")) {
+      // for HadoopCatalog tables, use the table location to load the metadata table
+      // because IcebergCatalog uses HiveCatalog when the table is identified by name
+      return table().location() + "#" + type;
+    } else if (tableName.startsWith("hive.")) {
+      // HiveCatalog prepend a logical name which we need to drop for Spark 2.4
+      return tableName.replaceFirst("hive\\.", "") + "." + type;
+    } else {
+      return tableName + "." + type;
+    }
+  }
+
+  /**
+   * Returns all the path locations of all Manifest Lists for a given list of snapshots
+   *
+   * @param snapshots snapshots
+   * @return the paths of the Manifest Lists
+   */
+  protected List<String> getManifestListPaths(Iterable<Snapshot> snapshots) {
+    List<String> manifestLists = Lists.newArrayList();
+    for (Snapshot snapshot : snapshots) {
+      String manifestListLocation = snapshot.manifestListLocation();
+      if (manifestListLocation != null) {
+        manifestLists.add(manifestListLocation);
+      }
+    }
+    return manifestLists;
+  }
+
+  /**
+   * Returns all Metadata file paths which may not be in the current metadata. Specifically this includes "version-hint"
+   * files as well as entries in metadata.previousFiles.
+   *
+   * @param ops TableOperations for the table we will be getting paths from
+   * @return a list of paths to metadata files
+   */
+  protected List<String> getOtherMetadataFilePaths(TableOperations ops) {
+    List<String> otherMetadataFiles = Lists.newArrayList();
+    otherMetadataFiles.add(ops.metadataFileLocation("version-hint.text"));
+
+    TableMetadata metadata = ops.current();
+    otherMetadataFiles.add(metadata.metadataFileLocation());
+    for (TableMetadata.MetadataLogEntry previousMetadataFile : metadata.previousFiles()) {
+      otherMetadataFiles.add(previousMetadataFile.file());
+    }
+    return otherMetadataFiles;
+  }
+
+}
diff --git a/spark/src/main/java/org/apache/iceberg/actions/RewriteDataFilesAction.java b/core/src/main/java/org/apache/iceberg/actions/BaseRewriteDataFilesAction.java
similarity index 79%
copy from spark/src/main/java/org/apache/iceberg/actions/RewriteDataFilesAction.java
copy to core/src/main/java/org/apache/iceberg/actions/BaseRewriteDataFilesAction.java
index e1d179a..df547f5 100644
--- a/spark/src/main/java/org/apache/iceberg/actions/RewriteDataFilesAction.java
+++ b/core/src/main/java/org/apache/iceberg/actions/BaseRewriteDataFilesAction.java
@@ -45,42 +45,33 @@ import org.apache.iceberg.relocated.com.google.common.collect.Maps;
 import org.apache.iceberg.relocated.com.google.common.collect.Multimaps;
 import org.apache.iceberg.relocated.com.google.common.collect.Sets;
 import org.apache.iceberg.relocated.com.google.common.collect.Streams;
-import org.apache.iceberg.spark.SparkUtil;
-import org.apache.iceberg.spark.source.RowDataRewriter;
 import org.apache.iceberg.util.PropertyUtil;
 import org.apache.iceberg.util.StructLikeWrapper;
 import org.apache.iceberg.util.TableScanUtil;
 import org.apache.iceberg.util.Tasks;
-import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.broadcast.Broadcast;
-import org.apache.spark.sql.SparkSession;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class RewriteDataFilesAction
-    extends BaseSnapshotUpdateAction<RewriteDataFilesAction, RewriteDataFilesActionResult> {
+public abstract class BaseRewriteDataFilesAction<ThisT>
+    extends BaseSnapshotUpdateAction<ThisT, RewriteDataFilesActionResult> {
 
-  private static final Logger LOG = LoggerFactory.getLogger(RewriteDataFilesAction.class);
+  private static final Logger LOG = LoggerFactory.getLogger(BaseRewriteDataFilesAction.class);
 
-  private final JavaSparkContext sparkContext;
   private final Table table;
   private final FileIO fileIO;
   private final EncryptionManager encryptionManager;
-  private final boolean caseSensitive;
+  private boolean caseSensitive;
+  private PartitionSpec spec;
+  private Expression filter;
   private long targetSizeInBytes;
   private int splitLookback;
   private long splitOpenFileCost;
 
-  private PartitionSpec spec = null;
-  private Expression filter;
-
-  RewriteDataFilesAction(SparkSession spark, Table table) {
-    this.sparkContext = new JavaSparkContext(spark.sparkContext());
+  public BaseRewriteDataFilesAction(Table table) {
     this.table = table;
     this.spec = table.spec();
     this.filter = Expressions.alwaysTrue();
-    this.caseSensitive = Boolean.parseBoolean(spark.conf().get("spark.sql.caseSensitive", "false"));
+    this.caseSensitive = false;
 
     long splitSize = PropertyUtil.propertyAsLong(
         table.properties(),
@@ -101,27 +92,41 @@ public class RewriteDataFilesAction
         TableProperties.SPLIT_OPEN_FILE_COST,
         TableProperties.SPLIT_OPEN_FILE_COST_DEFAULT);
 
-    this.fileIO = SparkUtil.serializableFileIO(table);
+    this.fileIO = fileIO();
     this.encryptionManager = table.encryption();
   }
 
   @Override
-  protected RewriteDataFilesAction self() {
-    return this;
-  }
-
-  @Override
   protected Table table() {
     return table;
   }
 
+  protected EncryptionManager encryptionManager() {
+    return encryptionManager;
+  }
+
+  protected boolean caseSensitive() {
+    return caseSensitive;
+  }
+
+  /**
+   * Is it case sensitive
+   *
+   * @param newCaseSensitive caseSensitive
+   * @return this for method chaining
+   */
+  public BaseRewriteDataFilesAction<ThisT> caseSensitive(boolean newCaseSensitive) {
+    this.caseSensitive = newCaseSensitive;
+    return this;
+  }
+
   /**
    * Pass a PartitionSpec id to specify which PartitionSpec should be used in DataFile rewrite
    *
    * @param specId PartitionSpec id to rewrite
    * @return this for method chaining
    */
-  public RewriteDataFilesAction outputSpecId(int specId) {
+  public BaseRewriteDataFilesAction<ThisT> outputSpecId(int specId) {
     Preconditions.checkArgument(table.specs().containsKey(specId), "Invalid spec id %d", specId);
     this.spec = table.specs().get(specId);
     return this;
@@ -133,7 +138,7 @@ public class RewriteDataFilesAction
    * @param targetSize size in bytes of rewrite data file
    * @return this for method chaining
    */
-  public RewriteDataFilesAction targetSizeInBytes(long targetSize) {
+  public BaseRewriteDataFilesAction<ThisT> targetSizeInBytes(long targetSize) {
     Preconditions.checkArgument(targetSize > 0L, "Invalid target rewrite data file size in bytes %d",
         targetSize);
     this.targetSizeInBytes = targetSize;
@@ -141,9 +146,9 @@ public class RewriteDataFilesAction
   }
 
   /**
-   * Specify the number of "bins" considered when trying to pack the next file split into a task.
-   * Increasing this usually makes tasks a bit more even by considering more ways to pack file regions into a single
-   * task with extra planning cost.
+   * Specify the number of "bins" considered when trying to pack the next file split into a task. Increasing this
+   * usually makes tasks a bit more even by considering more ways to pack file regions into a single task with extra
+   * planning cost.
    * <p>
    * This configuration can reorder the incoming file regions, to preserve order for lower/upper bounds in file
    * metadata, user can use a lookback of 1.
@@ -151,7 +156,7 @@ public class RewriteDataFilesAction
    * @param lookback number of "bins" considered when trying to pack the next file split into a task.
    * @return this for method chaining
    */
-  public RewriteDataFilesAction splitLookback(int lookback) {
+  public BaseRewriteDataFilesAction<ThisT> splitLookback(int lookback) {
     Preconditions.checkArgument(lookback > 0L, "Invalid split lookback %d", lookback);
     this.splitLookback = lookback;
     return this;
@@ -161,13 +166,13 @@ public class RewriteDataFilesAction
    * Specify the minimum file size to count to pack into one "bin". If the read file size is smaller than this specified
    * threshold, Iceberg will use this value to do count.
    * <p>
-   * this configuration controls the number of files to compact for each task, small value would lead to a
-   * high compaction, the default value is 4MB.
+   * this configuration controls the number of files to compact for each task, small value would lead to a high
+   * compaction, the default value is 4MB.
    *
    * @param openFileCost minimum file size to count to pack into one "bin".
    * @return this for method chaining
    */
-  public RewriteDataFilesAction splitOpenFileCost(long openFileCost) {
+  public BaseRewriteDataFilesAction<ThisT> splitOpenFileCost(long openFileCost) {
     Preconditions.checkArgument(openFileCost > 0L, "Invalid split openFileCost %d", openFileCost);
     this.splitOpenFileCost = openFileCost;
     return this;
@@ -180,7 +185,7 @@ public class RewriteDataFilesAction
    * @param expr Expression to filter out DataFiles
    * @return this for method chaining
    */
-  public RewriteDataFilesAction filter(Expression expr) {
+  public BaseRewriteDataFilesAction<ThisT> filter(Expression expr) {
     this.filter = Expressions.and(filter, expr);
     return this;
   }
@@ -213,7 +218,6 @@ public class RewriteDataFilesAction
     if (filteredGroupedTasks.isEmpty()) {
       return RewriteDataFilesActionResult.empty();
     }
-
     // Split and combine tasks under each partition
     List<CombinedScanTask> combinedScanTasks = filteredGroupedTasks.values().stream()
         .map(scanTasks -> {
@@ -224,14 +228,7 @@ public class RewriteDataFilesAction
         .flatMap(Streams::stream)
         .collect(Collectors.toList());
 
-    JavaRDD<CombinedScanTask> taskRDD = sparkContext.parallelize(combinedScanTasks, combinedScanTasks.size());
-
-    Broadcast<FileIO> io = sparkContext.broadcast(fileIO);
-    Broadcast<EncryptionManager> encryption = sparkContext.broadcast(encryptionManager);
-
-    RowDataRewriter rowDataRewriter = new RowDataRewriter(table, spec, caseSensitive, io, encryption);
-
-    List<DataFile> addedDataFiles = rowDataRewriter.rewriteDataForTasks(taskRDD);
+    List<DataFile> addedDataFiles = rewriteDataForTasks(combinedScanTasks);
     List<DataFile> currentDataFiles = filteredGroupedTasks.values().stream()
         .flatMap(tasks -> tasks.stream().map(FileScanTask::file))
         .collect(Collectors.toList());
@@ -240,25 +237,19 @@ public class RewriteDataFilesAction
     return new RewriteDataFilesActionResult(currentDataFiles, addedDataFiles);
   }
 
+
   private Map<StructLikeWrapper, Collection<FileScanTask>> groupTasksByPartition(
       CloseableIterator<FileScanTask> tasksIter) {
     ListMultimap<StructLikeWrapper, FileScanTask> tasksGroupedByPartition = Multimaps.newListMultimap(
         Maps.newHashMap(), Lists::newArrayList);
-
-    try {
-      tasksIter.forEachRemaining(task -> {
+    try (CloseableIterator<FileScanTask> iterator = tasksIter) {
+      iterator.forEachRemaining(task -> {
         StructLikeWrapper structLike = StructLikeWrapper.forType(spec.partitionType()).set(task.file().partition());
         tasksGroupedByPartition.put(structLike, task);
       });
-
-    } finally {
-      try {
-        tasksIter.close();
-      } catch (IOException ioe) {
-        LOG.warn("Failed to close task iterator", ioe);
-      }
+    } catch (IOException e) {
+      LOG.warn("Failed to close task iterator", e);
     }
-
     return tasksGroupedByPartition.asMap();
   }
 
@@ -267,15 +258,17 @@ public class RewriteDataFilesAction
       RewriteFiles rewriteFiles = table.newRewrite();
       rewriteFiles.rewriteFiles(Sets.newHashSet(deletedDataFiles), Sets.newHashSet(addedDataFiles));
       commit(rewriteFiles);
-
     } catch (Exception e) {
       Tasks.foreach(Iterables.transform(addedDataFiles, f -> f.path().toString()))
           .noRetry()
           .suppressFailureWhenFinished()
           .onFailure((location, exc) -> LOG.warn("Failed to delete: {}", location, exc))
           .run(fileIO::deleteFile);
-
       throw e;
     }
   }
+
+  protected abstract FileIO fileIO();
+
+  protected abstract List<DataFile> rewriteDataForTasks(List<CombinedScanTask> combinedScanTask);
 }
diff --git a/spark/src/main/java/org/apache/iceberg/actions/BaseSnapshotUpdateAction.java b/core/src/main/java/org/apache/iceberg/actions/BaseSnapshotUpdateAction.java
similarity index 100%
rename from spark/src/main/java/org/apache/iceberg/actions/BaseSnapshotUpdateAction.java
rename to core/src/main/java/org/apache/iceberg/actions/BaseSnapshotUpdateAction.java
diff --git a/spark/src/main/java/org/apache/iceberg/actions/RewriteDataFilesActionResult.java b/core/src/main/java/org/apache/iceberg/actions/RewriteDataFilesActionResult.java
similarity index 100%
rename from spark/src/main/java/org/apache/iceberg/actions/RewriteDataFilesActionResult.java
rename to core/src/main/java/org/apache/iceberg/actions/RewriteDataFilesActionResult.java
diff --git a/spark/src/main/java/org/apache/iceberg/actions/SnapshotUpdateAction.java b/core/src/main/java/org/apache/iceberg/actions/SnapshotUpdateAction.java
similarity index 100%
rename from spark/src/main/java/org/apache/iceberg/actions/SnapshotUpdateAction.java
rename to core/src/main/java/org/apache/iceberg/actions/SnapshotUpdateAction.java
diff --git a/spark/src/main/java/org/apache/iceberg/actions/BaseAction.java b/spark/src/main/java/org/apache/iceberg/actions/BaseSparkAction.java
similarity index 64%
rename from spark/src/main/java/org/apache/iceberg/actions/BaseAction.java
rename to spark/src/main/java/org/apache/iceberg/actions/BaseSparkAction.java
index e5898aa..dfbb59a 100644
--- a/spark/src/main/java/org/apache/iceberg/actions/BaseAction.java
+++ b/spark/src/main/java/org/apache/iceberg/actions/BaseSparkAction.java
@@ -24,14 +24,11 @@ import java.util.List;
 import org.apache.iceberg.BaseTable;
 import org.apache.iceberg.ManifestFiles;
 import org.apache.iceberg.MetadataTableType;
-import org.apache.iceberg.Snapshot;
 import org.apache.iceberg.StaticTableOperations;
 import org.apache.iceberg.Table;
-import org.apache.iceberg.TableMetadata;
 import org.apache.iceberg.TableOperations;
 import org.apache.iceberg.io.ClosingIterator;
 import org.apache.iceberg.io.FileIO;
-import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.spark.SparkUtil;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.api.java.function.FlatMapFunction;
@@ -41,62 +38,7 @@ import org.apache.spark.sql.Encoders;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.SparkSession;
 
-abstract class BaseAction<R> implements Action<R> {
-
-  protected abstract Table table();
-
-  protected String metadataTableName(MetadataTableType type) {
-    return metadataTableName(table().name(), type);
-  }
-
-  protected String metadataTableName(String tableName, MetadataTableType type) {
-    if (tableName.contains("/")) {
-      return tableName + "#" + type;
-    } else if (tableName.startsWith("hadoop.")) {
-      // for HadoopCatalog tables, use the table location to load the metadata table
-      // because IcebergCatalog uses HiveCatalog when the table is identified by name
-      return table().location() + "#" + type;
-    } else if (tableName.startsWith("hive.")) {
-      // HiveCatalog prepend a logical name which we need to drop for Spark 2.4
-      return tableName.replaceFirst("hive\\.", "") + "." + type;
-    } else {
-      return tableName + "." + type;
-    }
-  }
-
-  /**
-   * Returns all the path locations of all Manifest Lists for a given list of snapshots
-   * @param snapshots snapshots
-   * @return the paths of the Manifest Lists
-   */
-  private List<String> getManifestListPaths(Iterable<Snapshot> snapshots) {
-    List<String> manifestLists = Lists.newArrayList();
-    for (Snapshot snapshot : snapshots) {
-      String manifestListLocation = snapshot.manifestListLocation();
-      if (manifestListLocation != null) {
-        manifestLists.add(manifestListLocation);
-      }
-    }
-    return manifestLists;
-  }
-
-  /**
-   * Returns all Metadata file paths which may not be in the current metadata. Specifically
-   * this includes "version-hint" files as well as entries in metadata.previousFiles.
-   * @param ops TableOperations for the table we will be getting paths from
-   * @return a list of paths to metadata files
-   */
-  private List<String> getOtherMetadataFilePaths(TableOperations ops) {
-    List<String> otherMetadataFiles = Lists.newArrayList();
-    otherMetadataFiles.add(ops.metadataFileLocation("version-hint.text"));
-
-    TableMetadata metadata = ops.current();
-    otherMetadataFiles.add(metadata.metadataFileLocation());
-    for (TableMetadata.MetadataLogEntry previousMetadataFile : metadata.previousFiles()) {
-      otherMetadataFiles.add(previousMetadataFile.file());
-    }
-    return otherMetadataFiles;
-  }
+abstract class BaseSparkAction<R> extends BaseAction<R> implements Action<R> {
 
   protected Dataset<Row> buildValidDataFileDF(SparkSession spark) {
     return buildValidDataFileDF(spark, table().name());
diff --git a/spark/src/main/java/org/apache/iceberg/actions/ExpireSnapshotsAction.java b/spark/src/main/java/org/apache/iceberg/actions/ExpireSnapshotsAction.java
index ec759ef..0a08d7a 100644
--- a/spark/src/main/java/org/apache/iceberg/actions/ExpireSnapshotsAction.java
+++ b/spark/src/main/java/org/apache/iceberg/actions/ExpireSnapshotsAction.java
@@ -53,7 +53,7 @@ import org.slf4j.LoggerFactory;
  * are issued. Deletes are still performed locally after retrieving the results from the Spark executors.
  */
 @SuppressWarnings("UnnecessaryAnonymousClass")
-public class ExpireSnapshotsAction extends BaseAction<ExpireSnapshotsActionResult> {
+public class ExpireSnapshotsAction extends BaseSparkAction<ExpireSnapshotsActionResult> {
   private static final Logger LOG = LoggerFactory.getLogger(ExpireSnapshotsAction.class);
 
   private static final String DATA_FILE = "Data File";
diff --git a/spark/src/main/java/org/apache/iceberg/actions/RemoveOrphanFilesAction.java b/spark/src/main/java/org/apache/iceberg/actions/RemoveOrphanFilesAction.java
index 13684fa..c38fe84 100644
--- a/spark/src/main/java/org/apache/iceberg/actions/RemoveOrphanFilesAction.java
+++ b/spark/src/main/java/org/apache/iceberg/actions/RemoveOrphanFilesAction.java
@@ -67,7 +67,7 @@ import org.slf4j.LoggerFactory;
  * <em>Note:</em> It is dangerous to call this action with a short retention interval as it might corrupt
  * the state of the table if another operation is writing at the same time.
  */
-public class RemoveOrphanFilesAction extends BaseAction<List<String>> {
+public class RemoveOrphanFilesAction extends BaseSparkAction<List<String>> {
 
   private static final Logger LOG = LoggerFactory.getLogger(RemoveOrphanFilesAction.class);
   private static final UserDefinedFunction filename = functions.udf((String path) -> {
diff --git a/spark/src/main/java/org/apache/iceberg/actions/RewriteDataFilesAction.java b/spark/src/main/java/org/apache/iceberg/actions/RewriteDataFilesAction.java
index e1d179a..b22da52 100644
--- a/spark/src/main/java/org/apache/iceberg/actions/RewriteDataFilesAction.java
+++ b/spark/src/main/java/org/apache/iceberg/actions/RewriteDataFilesAction.java
@@ -19,90 +19,27 @@
 
 package org.apache.iceberg.actions;
 
-import java.io.IOException;
-import java.util.Collection;
 import java.util.List;
-import java.util.Map;
-import java.util.stream.Collectors;
 import org.apache.iceberg.CombinedScanTask;
 import org.apache.iceberg.DataFile;
-import org.apache.iceberg.FileScanTask;
-import org.apache.iceberg.PartitionSpec;
-import org.apache.iceberg.RewriteFiles;
 import org.apache.iceberg.Table;
-import org.apache.iceberg.TableProperties;
 import org.apache.iceberg.encryption.EncryptionManager;
-import org.apache.iceberg.expressions.Expression;
-import org.apache.iceberg.expressions.Expressions;
-import org.apache.iceberg.io.CloseableIterable;
-import org.apache.iceberg.io.CloseableIterator;
 import org.apache.iceberg.io.FileIO;
-import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
-import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
-import org.apache.iceberg.relocated.com.google.common.collect.ListMultimap;
-import org.apache.iceberg.relocated.com.google.common.collect.Lists;
-import org.apache.iceberg.relocated.com.google.common.collect.Maps;
-import org.apache.iceberg.relocated.com.google.common.collect.Multimaps;
-import org.apache.iceberg.relocated.com.google.common.collect.Sets;
-import org.apache.iceberg.relocated.com.google.common.collect.Streams;
 import org.apache.iceberg.spark.SparkUtil;
 import org.apache.iceberg.spark.source.RowDataRewriter;
-import org.apache.iceberg.util.PropertyUtil;
-import org.apache.iceberg.util.StructLikeWrapper;
-import org.apache.iceberg.util.TableScanUtil;
-import org.apache.iceberg.util.Tasks;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.broadcast.Broadcast;
 import org.apache.spark.sql.SparkSession;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 public class RewriteDataFilesAction
-    extends BaseSnapshotUpdateAction<RewriteDataFilesAction, RewriteDataFilesActionResult> {
-
-  private static final Logger LOG = LoggerFactory.getLogger(RewriteDataFilesAction.class);
+    extends BaseRewriteDataFilesAction<RewriteDataFilesAction> {
 
   private final JavaSparkContext sparkContext;
-  private final Table table;
-  private final FileIO fileIO;
-  private final EncryptionManager encryptionManager;
-  private final boolean caseSensitive;
-  private long targetSizeInBytes;
-  private int splitLookback;
-  private long splitOpenFileCost;
-
-  private PartitionSpec spec = null;
-  private Expression filter;
 
   RewriteDataFilesAction(SparkSession spark, Table table) {
+    super(table);
     this.sparkContext = new JavaSparkContext(spark.sparkContext());
-    this.table = table;
-    this.spec = table.spec();
-    this.filter = Expressions.alwaysTrue();
-    this.caseSensitive = Boolean.parseBoolean(spark.conf().get("spark.sql.caseSensitive", "false"));
-
-    long splitSize = PropertyUtil.propertyAsLong(
-        table.properties(),
-        TableProperties.SPLIT_SIZE,
-        TableProperties.SPLIT_SIZE_DEFAULT);
-    long targetFileSize = PropertyUtil.propertyAsLong(
-        table.properties(),
-        TableProperties.WRITE_TARGET_FILE_SIZE_BYTES,
-        TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT);
-    this.targetSizeInBytes = Math.min(splitSize, targetFileSize);
-
-    this.splitLookback = PropertyUtil.propertyAsInt(
-        table.properties(),
-        TableProperties.SPLIT_LOOKBACK,
-        TableProperties.SPLIT_LOOKBACK_DEFAULT);
-    this.splitOpenFileCost = PropertyUtil.propertyAsLong(
-        table.properties(),
-        TableProperties.SPLIT_OPEN_FILE_COST,
-        TableProperties.SPLIT_OPEN_FILE_COST_DEFAULT);
-
-    this.fileIO = SparkUtil.serializableFileIO(table);
-    this.encryptionManager = table.encryption();
   }
 
   @Override
@@ -111,171 +48,17 @@ public class RewriteDataFilesAction
   }
 
   @Override
-  protected Table table() {
-    return table;
-  }
-
-  /**
-   * Pass a PartitionSpec id to specify which PartitionSpec should be used in DataFile rewrite
-   *
-   * @param specId PartitionSpec id to rewrite
-   * @return this for method chaining
-   */
-  public RewriteDataFilesAction outputSpecId(int specId) {
-    Preconditions.checkArgument(table.specs().containsKey(specId), "Invalid spec id %d", specId);
-    this.spec = table.specs().get(specId);
-    return this;
-  }
-
-  /**
-   * Specify the target rewrite data file size in bytes
-   *
-   * @param targetSize size in bytes of rewrite data file
-   * @return this for method chaining
-   */
-  public RewriteDataFilesAction targetSizeInBytes(long targetSize) {
-    Preconditions.checkArgument(targetSize > 0L, "Invalid target rewrite data file size in bytes %d",
-        targetSize);
-    this.targetSizeInBytes = targetSize;
-    return this;
-  }
-
-  /**
-   * Specify the number of "bins" considered when trying to pack the next file split into a task.
-   * Increasing this usually makes tasks a bit more even by considering more ways to pack file regions into a single
-   * task with extra planning cost.
-   * <p>
-   * This configuration can reorder the incoming file regions, to preserve order for lower/upper bounds in file
-   * metadata, user can use a lookback of 1.
-   *
-   * @param lookback number of "bins" considered when trying to pack the next file split into a task.
-   * @return this for method chaining
-   */
-  public RewriteDataFilesAction splitLookback(int lookback) {
-    Preconditions.checkArgument(lookback > 0L, "Invalid split lookback %d", lookback);
-    this.splitLookback = lookback;
-    return this;
-  }
-
-  /**
-   * Specify the minimum file size to count to pack into one "bin". If the read file size is smaller than this specified
-   * threshold, Iceberg will use this value to do count.
-   * <p>
-   * this configuration controls the number of files to compact for each task, small value would lead to a
-   * high compaction, the default value is 4MB.
-   *
-   * @param openFileCost minimum file size to count to pack into one "bin".
-   * @return this for method chaining
-   */
-  public RewriteDataFilesAction splitOpenFileCost(long openFileCost) {
-    Preconditions.checkArgument(openFileCost > 0L, "Invalid split openFileCost %d", openFileCost);
-    this.splitOpenFileCost = openFileCost;
-    return this;
-  }
-
-  /**
-   * Pass a row Expression to filter DataFiles to be rewritten. Note that all files that may contain data matching the
-   * filter may be rewritten.
-   *
-   * @param expr Expression to filter out DataFiles
-   * @return this for method chaining
-   */
-  public RewriteDataFilesAction filter(Expression expr) {
-    this.filter = Expressions.and(filter, expr);
-    return this;
+  protected FileIO fileIO() {
+    return SparkUtil.serializableFileIO(table());
   }
 
   @Override
-  public RewriteDataFilesActionResult execute() {
-    CloseableIterable<FileScanTask> fileScanTasks = null;
-    try {
-      fileScanTasks = table.newScan()
-          .caseSensitive(caseSensitive)
-          .ignoreResiduals()
-          .filter(filter)
-          .planFiles();
-    } finally {
-      try {
-        if (fileScanTasks != null) {
-          fileScanTasks.close();
-        }
-      } catch (IOException ioe) {
-        LOG.warn("Failed to close task iterable", ioe);
-      }
-    }
-
-    Map<StructLikeWrapper, Collection<FileScanTask>> groupedTasks = groupTasksByPartition(fileScanTasks.iterator());
-    Map<StructLikeWrapper, Collection<FileScanTask>> filteredGroupedTasks = groupedTasks.entrySet().stream()
-        .filter(kv -> kv.getValue().size() > 1)
-        .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
-
-    // Nothing to rewrite if there's only one DataFile in each partition.
-    if (filteredGroupedTasks.isEmpty()) {
-      return RewriteDataFilesActionResult.empty();
-    }
-
-    // Split and combine tasks under each partition
-    List<CombinedScanTask> combinedScanTasks = filteredGroupedTasks.values().stream()
-        .map(scanTasks -> {
-          CloseableIterable<FileScanTask> splitTasks = TableScanUtil.splitFiles(
-              CloseableIterable.withNoopClose(scanTasks), targetSizeInBytes);
-          return TableScanUtil.planTasks(splitTasks, targetSizeInBytes, splitLookback, splitOpenFileCost);
-        })
-        .flatMap(Streams::stream)
-        .collect(Collectors.toList());
-
+  protected List<DataFile> rewriteDataForTasks(List<CombinedScanTask> combinedScanTasks) {
     JavaRDD<CombinedScanTask> taskRDD = sparkContext.parallelize(combinedScanTasks, combinedScanTasks.size());
-
-    Broadcast<FileIO> io = sparkContext.broadcast(fileIO);
-    Broadcast<EncryptionManager> encryption = sparkContext.broadcast(encryptionManager);
-
-    RowDataRewriter rowDataRewriter = new RowDataRewriter(table, spec, caseSensitive, io, encryption);
-
-    List<DataFile> addedDataFiles = rowDataRewriter.rewriteDataForTasks(taskRDD);
-    List<DataFile> currentDataFiles = filteredGroupedTasks.values().stream()
-        .flatMap(tasks -> tasks.stream().map(FileScanTask::file))
-        .collect(Collectors.toList());
-    replaceDataFiles(currentDataFiles, addedDataFiles);
-
-    return new RewriteDataFilesActionResult(currentDataFiles, addedDataFiles);
-  }
-
-  private Map<StructLikeWrapper, Collection<FileScanTask>> groupTasksByPartition(
-      CloseableIterator<FileScanTask> tasksIter) {
-    ListMultimap<StructLikeWrapper, FileScanTask> tasksGroupedByPartition = Multimaps.newListMultimap(
-        Maps.newHashMap(), Lists::newArrayList);
-
-    try {
-      tasksIter.forEachRemaining(task -> {
-        StructLikeWrapper structLike = StructLikeWrapper.forType(spec.partitionType()).set(task.file().partition());
-        tasksGroupedByPartition.put(structLike, task);
-      });
-
-    } finally {
-      try {
-        tasksIter.close();
-      } catch (IOException ioe) {
-        LOG.warn("Failed to close task iterator", ioe);
-      }
-    }
-
-    return tasksGroupedByPartition.asMap();
-  }
-
-  private void replaceDataFiles(Iterable<DataFile> deletedDataFiles, Iterable<DataFile> addedDataFiles) {
-    try {
-      RewriteFiles rewriteFiles = table.newRewrite();
-      rewriteFiles.rewriteFiles(Sets.newHashSet(deletedDataFiles), Sets.newHashSet(addedDataFiles));
-      commit(rewriteFiles);
-
-    } catch (Exception e) {
-      Tasks.foreach(Iterables.transform(addedDataFiles, f -> f.path().toString()))
-          .noRetry()
-          .suppressFailureWhenFinished()
-          .onFailure((location, exc) -> LOG.warn("Failed to delete: {}", location, exc))
-          .run(fileIO::deleteFile);
-
-      throw e;
-    }
+    Broadcast<FileIO> io = sparkContext.broadcast(fileIO());
+    Broadcast<EncryptionManager> encryption = sparkContext.broadcast(encryptionManager());
+    RowDataRewriter rowDataRewriter =
+        new RowDataRewriter(table(), table().spec(), caseSensitive(), io, encryption);
+    return rowDataRewriter.rewriteDataForTasks(taskRDD);
   }
 }