You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2022/07/12 23:00:54 UTC

[iceberg] branch master updated: Spark 3.2: Expose action classes in SparkActions (#5261)

This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 6d2edd6284 Spark 3.2: Expose action classes in SparkActions (#5261)
6d2edd6284 is described below

commit 6d2edd6284ebc5301dbe45376a31ca8316852a77
Author: Anton Okolnychyi <ao...@apple.com>
AuthorDate: Tue Jul 12 16:00:50 2022 -0700

    Spark 3.2: Expose action classes in SparkActions (#5261)
---
 .../actions/BaseDeleteOrphanFilesSparkAction.java  | 318 +-------------
 .../BaseDeleteReachableFilesSparkAction.java       | 158 +------
 .../actions/BaseExpireSnapshotsSparkAction.java    | 228 +---------
 .../spark/actions/BaseMigrateTableSparkAction.java | 189 +--------
 .../actions/BaseRewriteDataFilesSparkAction.java   | 459 +--------------------
 .../actions/BaseRewriteManifestsSparkAction.java   | 348 +---------------
 .../actions/BaseSnapshotTableSparkAction.java      | 179 +-------
 .../actions/BaseSnapshotUpdateSparkAction.java     |   5 +-
 .../iceberg/spark/actions/BaseSparkAction.java     |   5 +-
 .../actions/BaseTableCreationSparkAction.java      |   2 +-
 ...tion.java => DeleteOrphanFilesSparkAction.java} |  20 +-
 ...n.java => DeleteReachableFilesSparkAction.java} |  16 +-
 ...Action.java => ExpireSnapshotsSparkAction.java} |  20 +-
 ...arkAction.java => MigrateTableSparkAction.java} |  14 +-
 ...ction.java => RewriteDataFilesSparkAction.java} |  21 +-
 ...ction.java => RewriteManifestsSparkAction.java} |  20 +-
 ...rkAction.java => SnapshotTableSparkAction.java} |  18 +-
 .../apache/iceberg/spark/actions/SparkActions.java |  21 +-
 .../spark/procedures/ExpireSnapshotsProcedure.java |   4 +-
 .../procedures/RemoveOrphanFilesProcedure.java     |   6 +-
 .../procedures/RewriteManifestsProcedure.java      |   5 +-
 .../spark/actions/TestExpireSnapshotsAction.java   |   2 +-
 .../spark/actions/TestRemoveOrphanFilesAction.java |   8 +-
 .../spark/actions/TestRewriteDataFilesAction.java  |   2 +-
 24 files changed, 148 insertions(+), 1920 deletions(-)

diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseDeleteOrphanFilesSparkAction.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseDeleteOrphanFilesSparkAction.java
index dc58a05d4d..7cbcf164d4 100644
--- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseDeleteOrphanFilesSparkAction.java
+++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseDeleteOrphanFilesSparkAction.java
@@ -19,346 +19,52 @@
 
 package org.apache.iceberg.spark.actions;
 
-import java.io.File;
-import java.io.IOException;
-import java.io.Serializable;
-import java.sql.Timestamp;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.TimeUnit;
 import java.util.function.Consumer;
-import java.util.function.Predicate;
-import java.util.stream.Collectors;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.PathFilter;
-import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Table;
-import org.apache.iceberg.actions.BaseDeleteOrphanFilesActionResult;
-import org.apache.iceberg.actions.DeleteOrphanFiles;
-import org.apache.iceberg.exceptions.RuntimeIOException;
-import org.apache.iceberg.exceptions.ValidationException;
-import org.apache.iceberg.hadoop.HiddenPathFilter;
-import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
-import org.apache.iceberg.relocated.com.google.common.base.Joiner;
-import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
-import org.apache.iceberg.relocated.com.google.common.collect.Lists;
-import org.apache.iceberg.spark.JobGroupInfo;
-import org.apache.iceberg.util.PropertyUtil;
-import org.apache.iceberg.util.Tasks;
-import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.function.FlatMapFunction;
-import org.apache.spark.broadcast.Broadcast;
-import org.apache.spark.sql.Column;
 import org.apache.spark.sql.Dataset;
-import org.apache.spark.sql.Encoders;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.SparkSession;
-import org.apache.spark.sql.expressions.UserDefinedFunction;
-import org.apache.spark.sql.functions;
-import org.apache.spark.sql.types.DataTypes;
-import org.apache.spark.sql.types.StructField;
-import org.apache.spark.sql.types.StructType;
-import org.apache.spark.util.SerializableConfiguration;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static org.apache.iceberg.TableProperties.GC_ENABLED;
-import static org.apache.iceberg.TableProperties.GC_ENABLED_DEFAULT;
 
 /**
- * An action that removes orphan metadata, data and delete files by listing a given location and comparing
- * the actual files in that location with content and metadata files referenced by all valid snapshots.
- * The location must be accessible for listing via the Hadoop {@link FileSystem}.
- * <p>
- * By default, this action cleans up the table location returned by {@link Table#location()} and
- * removes unreachable files that are older than 3 days using {@link Table#io()}. The behavior can be modified
- * by passing a custom location to {@link #location} and a custom timestamp to {@link #olderThan(long)}.
- * For example, someone might point this action to the data folder to clean up only orphan data files.
- * <p>
- * Configure an alternative delete method using {@link #deleteWith(Consumer)}.
- * <p>
- * For full control of the set of files being evaluated, use the {@link #compareToFileList(Dataset)} argument.  This
- * skips the directory listing - any files in the dataset provided which are not found in table metadata will
- * be deleted, using the same {@link Table#location()} and {@link #olderThan(long)} filtering as above.
- * <p>
- * <em>Note:</em> It is dangerous to call this action with a short retention interval as it might corrupt
- * the state of the table if another operation is writing at the same time.
+ * An action to delete orphan files.
+ *
+ * @deprecated since 0.14.0, will be removed in 1.0.0;
+ *             use {@link SparkActions} and {@link DeleteOrphanFilesSparkAction} instead.
  */
-public class BaseDeleteOrphanFilesSparkAction
-    extends BaseSparkAction<DeleteOrphanFiles, DeleteOrphanFiles.Result> implements DeleteOrphanFiles {
-
-  private static final Logger LOG = LoggerFactory.getLogger(BaseDeleteOrphanFilesSparkAction.class);
-  private static final UserDefinedFunction filenameUDF = functions.udf((String path) -> {
-    int lastIndex = path.lastIndexOf(File.separator);
-    if (lastIndex == -1) {
-      return path;
-    } else {
-      return path.substring(lastIndex + 1);
-    }
-  }, DataTypes.StringType);
-
-  private final SerializableConfiguration hadoopConf;
-  private final int partitionDiscoveryParallelism;
-  private final Table table;
-  private final Consumer<String> defaultDelete = new Consumer<String>() {
-    @Override
-    public void accept(String file) {
-      table.io().deleteFile(file);
-    }
-  };
-
-  private String location = null;
-  private long olderThanTimestamp = System.currentTimeMillis() - TimeUnit.DAYS.toMillis(3);
-  private Dataset<Row> compareToFileList;
-  private Consumer<String> deleteFunc = defaultDelete;
-  private ExecutorService deleteExecutorService = null;
+@Deprecated
+public class BaseDeleteOrphanFilesSparkAction extends DeleteOrphanFilesSparkAction {
 
   public BaseDeleteOrphanFilesSparkAction(SparkSession spark, Table table) {
-    super(spark);
-
-    this.hadoopConf = new SerializableConfiguration(spark.sessionState().newHadoopConf());
-    this.partitionDiscoveryParallelism = spark.sessionState().conf().parallelPartitionDiscoveryParallelism();
-    this.table = table;
-    this.location = table.location();
-
-    ValidationException.check(
-        PropertyUtil.propertyAsBoolean(table.properties(), GC_ENABLED, GC_ENABLED_DEFAULT),
-        "Cannot delete orphan files: GC is disabled (deleting files may corrupt other tables)");
-  }
-
-  @Override
-  protected DeleteOrphanFiles self() {
-    return this;
+    super(spark, table);
   }
 
   @Override
   public BaseDeleteOrphanFilesSparkAction executeDeleteWith(ExecutorService executorService) {
-    this.deleteExecutorService = executorService;
+    super.executeDeleteWith(executorService);
     return this;
   }
 
   @Override
   public BaseDeleteOrphanFilesSparkAction location(String newLocation) {
-    this.location = newLocation;
+    super.location(newLocation);
     return this;
   }
 
   @Override
   public BaseDeleteOrphanFilesSparkAction olderThan(long newOlderThanTimestamp) {
-    this.olderThanTimestamp = newOlderThanTimestamp;
+    super.olderThan(newOlderThanTimestamp);
     return this;
   }
 
   @Override
   public BaseDeleteOrphanFilesSparkAction deleteWith(Consumer<String> newDeleteFunc) {
-    this.deleteFunc = newDeleteFunc;
+    super.deleteWith(newDeleteFunc);
     return this;
   }
 
   public BaseDeleteOrphanFilesSparkAction compareToFileList(Dataset<Row> files) {
-    StructType schema = files.schema();
-
-    StructField filePathField = schema.apply(FILE_PATH);
-    Preconditions.checkArgument(
-        filePathField.dataType() == DataTypes.StringType,
-        "Invalid %s column: %s is not a string",
-        FILE_PATH,
-        filePathField.dataType());
-
-    StructField lastModifiedField = schema.apply(LAST_MODIFIED);
-    Preconditions.checkArgument(
-        lastModifiedField.dataType() == DataTypes.TimestampType,
-        "Invalid %s column: %s is not a timestamp",
-        LAST_MODIFIED,
-        lastModifiedField.dataType());
-
-    this.compareToFileList = files;
+    super.compareToFileList(files);
     return this;
   }
-
-  private Dataset<Row> filteredCompareToFileList() {
-    Dataset<Row> files = compareToFileList;
-    if (location != null) {
-      files = files.filter(files.col(FILE_PATH).startsWith(location));
-    }
-    return files
-        .filter(files.col(LAST_MODIFIED).lt(new Timestamp(olderThanTimestamp)))
-        .select(files.col(FILE_PATH));
-  }
-
-  @Override
-  public DeleteOrphanFiles.Result execute() {
-    JobGroupInfo info = newJobGroupInfo("DELETE-ORPHAN-FILES", jobDesc());
-    return withJobGroupInfo(info, this::doExecute);
-  }
-
-  private String jobDesc() {
-    List<String> options = Lists.newArrayList();
-    options.add("older_than=" + olderThanTimestamp);
-    if (location != null) {
-      options.add("location=" + location);
-    }
-    return String.format("Deleting orphan files (%s) from %s", Joiner.on(',').join(options), table.name());
-  }
-
-  private DeleteOrphanFiles.Result doExecute() {
-    Dataset<Row> validContentFileDF = buildValidContentFileDF(table);
-    Dataset<Row> validMetadataFileDF = buildValidMetadataFileDF(table);
-    Dataset<Row> validFileDF = validContentFileDF.union(validMetadataFileDF);
-    Dataset<Row> actualFileDF = compareToFileList == null ? buildActualFileDF() : filteredCompareToFileList();
-
-    Column actualFileName = filenameUDF.apply(actualFileDF.col(FILE_PATH));
-    Column validFileName = filenameUDF.apply(validFileDF.col(FILE_PATH));
-    Column nameEqual = actualFileName.equalTo(validFileName);
-    Column actualContains = actualFileDF.col(FILE_PATH).contains(validFileDF.col(FILE_PATH));
-    Column joinCond = nameEqual.and(actualContains);
-    List<String> orphanFiles = actualFileDF.join(validFileDF, joinCond, "leftanti")
-        .as(Encoders.STRING())
-        .collectAsList();
-
-    Tasks.foreach(orphanFiles)
-        .noRetry()
-        .executeWith(deleteExecutorService)
-        .suppressFailureWhenFinished()
-        .onFailure((file, exc) -> LOG.warn("Failed to delete file: {}", file, exc))
-        .run(deleteFunc::accept);
-
-    return new BaseDeleteOrphanFilesActionResult(orphanFiles);
-  }
-
-  private Dataset<Row> buildActualFileDF() {
-    List<String> subDirs = Lists.newArrayList();
-    List<String> matchingFiles = Lists.newArrayList();
-
-    Predicate<FileStatus> predicate = file -> file.getModificationTime() < olderThanTimestamp;
-    PathFilter pathFilter = PartitionAwareHiddenPathFilter.forSpecs(table.specs());
-
-    // list at most 3 levels and only dirs that have less than 10 direct sub dirs on the driver
-    listDirRecursively(location, predicate, hadoopConf.value(), 3, 10, subDirs, pathFilter, matchingFiles);
-
-    JavaRDD<String> matchingFileRDD = sparkContext().parallelize(matchingFiles, 1);
-
-    if (subDirs.isEmpty()) {
-      return spark().createDataset(matchingFileRDD.rdd(), Encoders.STRING()).toDF(FILE_PATH);
-    }
-
-    int parallelism = Math.min(subDirs.size(), partitionDiscoveryParallelism);
-    JavaRDD<String> subDirRDD = sparkContext().parallelize(subDirs, parallelism);
-
-    Broadcast<SerializableConfiguration> conf = sparkContext().broadcast(hadoopConf);
-    JavaRDD<String> matchingLeafFileRDD = subDirRDD.mapPartitions(
-        listDirsRecursively(conf, olderThanTimestamp, pathFilter)
-    );
-
-    JavaRDD<String> completeMatchingFileRDD = matchingFileRDD.union(matchingLeafFileRDD);
-    return spark().createDataset(completeMatchingFileRDD.rdd(), Encoders.STRING()).toDF(FILE_PATH);
-  }
-
-  private static void listDirRecursively(
-      String dir, Predicate<FileStatus> predicate, Configuration conf, int maxDepth,
-      int maxDirectSubDirs, List<String> remainingSubDirs, PathFilter pathFilter, List<String> matchingFiles) {
-
-    // stop listing whenever we reach the max depth
-    if (maxDepth <= 0) {
-      remainingSubDirs.add(dir);
-      return;
-    }
-
-    try {
-      Path path = new Path(dir);
-      FileSystem fs = path.getFileSystem(conf);
-
-      List<String> subDirs = Lists.newArrayList();
-
-      for (FileStatus file : fs.listStatus(path, pathFilter)) {
-        if (file.isDirectory()) {
-          subDirs.add(file.getPath().toString());
-        } else if (file.isFile() && predicate.test(file)) {
-          matchingFiles.add(file.getPath().toString());
-        }
-      }
-
-      // stop listing if the number of direct sub dirs is bigger than maxDirectSubDirs
-      if (subDirs.size() > maxDirectSubDirs) {
-        remainingSubDirs.addAll(subDirs);
-        return;
-      }
-
-      for (String subDir : subDirs) {
-        listDirRecursively(
-            subDir, predicate, conf, maxDepth - 1, maxDirectSubDirs, remainingSubDirs, pathFilter, matchingFiles);
-      }
-    } catch (IOException e) {
-      throw new RuntimeIOException(e);
-    }
-  }
-
-  private static FlatMapFunction<Iterator<String>, String> listDirsRecursively(
-      Broadcast<SerializableConfiguration> conf,
-      long olderThanTimestamp,
-      PathFilter pathFilter) {
-
-    return dirs -> {
-      List<String> subDirs = Lists.newArrayList();
-      List<String> files = Lists.newArrayList();
-
-      Predicate<FileStatus> predicate = file -> file.getModificationTime() < olderThanTimestamp;
-
-      int maxDepth = 2000;
-      int maxDirectSubDirs = Integer.MAX_VALUE;
-
-      dirs.forEachRemaining(dir -> {
-        listDirRecursively(
-                dir, predicate, conf.value().value(), maxDepth, maxDirectSubDirs, subDirs, pathFilter, files);
-      });
-
-      if (!subDirs.isEmpty()) {
-        throw new RuntimeException("Could not list subdirectories, reached maximum subdirectory depth: " + maxDepth);
-      }
-
-      return files.iterator();
-    };
-  }
-
-  /**
-   * A {@link PathFilter} that filters out hidden path, but does not filter out paths that would be marked
-   * as hidden by {@link HiddenPathFilter} due to a partition field that starts with one of the characters that
-   * indicate a hidden path.
-   */
-  @VisibleForTesting
-  static class PartitionAwareHiddenPathFilter implements PathFilter, Serializable {
-
-    private final Set<String> hiddenPathPartitionNames;
-
-    PartitionAwareHiddenPathFilter(Set<String> hiddenPathPartitionNames) {
-      this.hiddenPathPartitionNames = hiddenPathPartitionNames;
-    }
-
-    @Override
-    public boolean accept(Path path) {
-      boolean isHiddenPartitionPath = hiddenPathPartitionNames.stream().anyMatch(path.getName()::startsWith);
-      return isHiddenPartitionPath || HiddenPathFilter.get().accept(path);
-    }
-
-    static PathFilter forSpecs(Map<Integer, PartitionSpec> specs) {
-      if (specs == null) {
-        return HiddenPathFilter.get();
-      }
-
-      Set<String> partitionNames = specs.values().stream()
-          .map(PartitionSpec::fields)
-          .flatMap(List::stream)
-          .filter(partitionField -> partitionField.name().startsWith("_") || partitionField.name().startsWith("."))
-          .map(partitionField -> partitionField.name() + "=")
-          .collect(Collectors.toSet());
-
-      return partitionNames.isEmpty() ? HiddenPathFilter.get() : new PartitionAwareHiddenPathFilter(partitionNames);
-    }
-  }
 }
diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseDeleteReachableFilesSparkAction.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseDeleteReachableFilesSparkAction.java
index fc2a5fa5cf..0c69e4e1e6 100644
--- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseDeleteReachableFilesSparkAction.java
+++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseDeleteReachableFilesSparkAction.java
@@ -19,165 +19,21 @@
 
 package org.apache.iceberg.spark.actions;
 
-import java.util.Iterator;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.function.Consumer;
-import org.apache.iceberg.Table;
-import org.apache.iceberg.TableMetadata;
-import org.apache.iceberg.TableMetadataParser;
-import org.apache.iceberg.actions.BaseDeleteReachableFilesActionResult;
-import org.apache.iceberg.actions.DeleteReachableFiles;
-import org.apache.iceberg.exceptions.NotFoundException;
-import org.apache.iceberg.exceptions.ValidationException;
-import org.apache.iceberg.hadoop.HadoopFileIO;
-import org.apache.iceberg.io.FileIO;
-import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
-import org.apache.iceberg.spark.JobGroupInfo;
-import org.apache.iceberg.util.PropertyUtil;
-import org.apache.iceberg.util.Tasks;
-import org.apache.spark.sql.Dataset;
-import org.apache.spark.sql.Row;
 import org.apache.spark.sql.SparkSession;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static org.apache.iceberg.TableProperties.GC_ENABLED;
-import static org.apache.iceberg.TableProperties.GC_ENABLED_DEFAULT;
 
 /**
- * An implementation of {@link DeleteReachableFiles} that uses metadata tables in Spark
- * to determine which files should be deleted.
+ * An action that deletes reachable files from a given metadata file.
+ *
+ * @deprecated since 0.14.0, will be removed in 1.0.0;
+ *             use {@link SparkActions} and {@link DeleteReachableFilesSparkAction} instead.
  */
-@SuppressWarnings("UnnecessaryAnonymousClass")
-public class BaseDeleteReachableFilesSparkAction
-    extends BaseSparkAction<DeleteReachableFiles, DeleteReachableFiles.Result> implements DeleteReachableFiles {
+@Deprecated
+public class BaseDeleteReachableFilesSparkAction extends DeleteReachableFilesSparkAction {
 
   public static final String STREAM_RESULTS = "stream-results";
   public static final boolean STREAM_RESULTS_DEFAULT = false;
 
-  private static final Logger LOG = LoggerFactory.getLogger(BaseDeleteReachableFilesSparkAction.class);
-
-  private final String metadataFileLocation;
-  private final Consumer<String> defaultDelete = new Consumer<String>() {
-    @Override
-    public void accept(String file) {
-      io.deleteFile(file);
-    }
-  };
-
-  private Consumer<String> deleteFunc = defaultDelete;
-  private ExecutorService deleteExecutorService = null;
-  private FileIO io = new HadoopFileIO(spark().sessionState().newHadoopConf());
-
   public BaseDeleteReachableFilesSparkAction(SparkSession spark, String metadataFileLocation) {
-    super(spark);
-    this.metadataFileLocation = metadataFileLocation;
-  }
-
-  @Override
-  protected DeleteReachableFiles self() {
-    return this;
-  }
-
-  @Override
-  public DeleteReachableFiles io(FileIO fileIO) {
-    this.io = fileIO;
-    return this;
-  }
-
-  @Override
-  public DeleteReachableFiles deleteWith(Consumer<String> newDeleteFunc) {
-    this.deleteFunc = newDeleteFunc;
-    return this;
-  }
-
-  @Override
-  public DeleteReachableFiles executeDeleteWith(ExecutorService executorService) {
-    this.deleteExecutorService = executorService;
-    return this;
-  }
-
-  @Override
-  public Result execute() {
-    Preconditions.checkArgument(io != null, "File IO cannot be null");
-    String jobDesc = String.format("Deleting files reachable from %s", metadataFileLocation);
-    JobGroupInfo info = newJobGroupInfo("DELETE-REACHABLE-FILES", jobDesc);
-    return withJobGroupInfo(info, this::doExecute);
-  }
-
-  private Result doExecute() {
-    TableMetadata metadata = TableMetadataParser.read(io, metadataFileLocation);
-
-    ValidationException.check(
-        PropertyUtil.propertyAsBoolean(metadata.properties(), GC_ENABLED, GC_ENABLED_DEFAULT),
-        "Cannot delete files: GC is disabled (deleting files may corrupt other tables)");
-
-    Dataset<Row> reachableFileDF = buildReachableFileDF(metadata).distinct();
-
-    boolean streamResults = PropertyUtil.propertyAsBoolean(options(), STREAM_RESULTS, STREAM_RESULTS_DEFAULT);
-    if (streamResults) {
-      return deleteFiles(reachableFileDF.toLocalIterator());
-    } else {
-      return deleteFiles(reachableFileDF.collectAsList().iterator());
-    }
-  }
-
-  private Dataset<Row> buildReachableFileDF(TableMetadata metadata) {
-    Table staticTable = newStaticTable(metadata, io);
-    return withFileType(buildValidContentFileDF(staticTable), CONTENT_FILE)
-        .union(withFileType(buildManifestFileDF(staticTable), MANIFEST))
-        .union(withFileType(buildManifestListDF(staticTable), MANIFEST_LIST))
-        .union(withFileType(buildAllReachableOtherMetadataFileDF(staticTable), OTHERS));
-  }
-
-  /**
-   * Deletes files passed to it.
-   *
-   * @param deleted an Iterator of Spark Rows of the structure (path: String, type: String)
-   * @return Statistics on which files were deleted
-   */
-  private BaseDeleteReachableFilesActionResult deleteFiles(Iterator<Row> deleted) {
-    AtomicLong dataFileCount = new AtomicLong(0L);
-    AtomicLong manifestCount = new AtomicLong(0L);
-    AtomicLong manifestListCount = new AtomicLong(0L);
-    AtomicLong otherFilesCount = new AtomicLong(0L);
-
-    Tasks.foreach(deleted)
-        .retry(3).stopRetryOn(NotFoundException.class).suppressFailureWhenFinished()
-        .executeWith(deleteExecutorService)
-        .onFailure((fileInfo, exc) -> {
-          String file = fileInfo.getString(0);
-          String type = fileInfo.getString(1);
-          LOG.warn("Delete failed for {}: {}", type, file, exc);
-        })
-        .run(fileInfo -> {
-          String file = fileInfo.getString(0);
-          String type = fileInfo.getString(1);
-          deleteFunc.accept(file);
-          switch (type) {
-            case CONTENT_FILE:
-              dataFileCount.incrementAndGet();
-              LOG.trace("Deleted Content File: {}", file);
-              break;
-            case MANIFEST:
-              manifestCount.incrementAndGet();
-              LOG.debug("Deleted Manifest: {}", file);
-              break;
-            case MANIFEST_LIST:
-              manifestListCount.incrementAndGet();
-              LOG.debug("Deleted Manifest List: {}", file);
-              break;
-            case OTHERS:
-              otherFilesCount.incrementAndGet();
-              LOG.debug("Others: {}", file);
-              break;
-          }
-        });
-
-    long filesCount = dataFileCount.get() + manifestCount.get() + manifestListCount.get() + otherFilesCount.get();
-    LOG.info("Total files removed: {}", filesCount);
-    return new BaseDeleteReachableFilesActionResult(dataFileCount.get(), manifestCount.get(), manifestListCount.get(),
-        otherFilesCount.get());
+    super(spark, metadataFileLocation);
   }
 }
diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseExpireSnapshotsSparkAction.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseExpireSnapshotsSparkAction.java
index 2216383337..cc70279f39 100644
--- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseExpireSnapshotsSparkAction.java
+++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseExpireSnapshotsSparkAction.java
@@ -19,258 +19,54 @@
 
 package org.apache.iceberg.spark.actions;
 
-import java.util.Iterator;
-import java.util.List;
-import java.util.Set;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Consumer;
-import org.apache.iceberg.FileContent;
-import org.apache.iceberg.HasTableOperations;
 import org.apache.iceberg.Table;
-import org.apache.iceberg.TableMetadata;
-import org.apache.iceberg.TableOperations;
-import org.apache.iceberg.actions.BaseExpireSnapshotsActionResult;
-import org.apache.iceberg.actions.ExpireSnapshots;
-import org.apache.iceberg.exceptions.NotFoundException;
-import org.apache.iceberg.exceptions.ValidationException;
-import org.apache.iceberg.relocated.com.google.common.base.Joiner;
-import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
-import org.apache.iceberg.relocated.com.google.common.collect.Lists;
-import org.apache.iceberg.relocated.com.google.common.collect.Sets;
-import org.apache.iceberg.spark.JobGroupInfo;
-import org.apache.iceberg.util.PropertyUtil;
-import org.apache.iceberg.util.Tasks;
-import org.apache.spark.sql.Dataset;
-import org.apache.spark.sql.Row;
 import org.apache.spark.sql.SparkSession;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static org.apache.iceberg.TableProperties.GC_ENABLED;
-import static org.apache.iceberg.TableProperties.GC_ENABLED_DEFAULT;
 
 /**
- * An action that performs the same operation as {@link org.apache.iceberg.ExpireSnapshots} but uses Spark
- * to determine the delta in files between the pre and post-expiration table metadata. All of the same
- * restrictions of {@link org.apache.iceberg.ExpireSnapshots} also apply to this action.
- * <p>
- * This action first leverages {@link org.apache.iceberg.ExpireSnapshots} to expire snapshots and then
- * uses metadata tables to find files that can be safely deleted. This is done by anti-joining two Datasets
- * that contain all manifest and content files before and after the expiration. The snapshot expiration
- * will be fully committed before any deletes are issued.
- * <p>
- * This operation performs a shuffle so the parallelism can be controlled through 'spark.sql.shuffle.partitions'.
- * <p>
- * Deletes are still performed locally after retrieving the results from the Spark executors.
+ * An action to expire snapshots.
+ *
+ * @deprecated since 0.14.0, will be removed in 1.0.0;
+ *             use {@link SparkActions} and {@link ExpireSnapshotsSparkAction} instead.
  */
-@SuppressWarnings("UnnecessaryAnonymousClass")
-public class BaseExpireSnapshotsSparkAction
-    extends BaseSparkAction<ExpireSnapshots, ExpireSnapshots.Result> implements ExpireSnapshots {
+@Deprecated
+public class BaseExpireSnapshotsSparkAction extends ExpireSnapshotsSparkAction {
 
   public static final String STREAM_RESULTS = "stream-results";
   public static final boolean STREAM_RESULTS_DEFAULT = false;
 
-  private static final Logger LOG = LoggerFactory.getLogger(BaseExpireSnapshotsSparkAction.class);
-
-  private final Table table;
-  private final TableOperations ops;
-  private final Consumer<String> defaultDelete = new Consumer<String>() {
-    @Override
-    public void accept(String file) {
-      ops.io().deleteFile(file);
-    }
-  };
-
-  private final Set<Long> expiredSnapshotIds = Sets.newHashSet();
-  private Long expireOlderThanValue = null;
-  private Integer retainLastValue = null;
-  private Consumer<String> deleteFunc = defaultDelete;
-  private ExecutorService deleteExecutorService = null;
-  private Dataset<Row> expiredFiles = null;
-
   public BaseExpireSnapshotsSparkAction(SparkSession spark, Table table) {
-    super(spark);
-    this.table = table;
-    this.ops = ((HasTableOperations) table).operations();
-
-    ValidationException.check(
-        PropertyUtil.propertyAsBoolean(table.properties(), GC_ENABLED, GC_ENABLED_DEFAULT),
-        "Cannot expire snapshots: GC is disabled (deleting files may corrupt other tables)");
-  }
-
-  @Override
-  protected ExpireSnapshots self() {
-    return this;
+    super(spark, table);
   }
 
   @Override
   public BaseExpireSnapshotsSparkAction executeDeleteWith(ExecutorService executorService) {
-    this.deleteExecutorService = executorService;
+    super.executeDeleteWith(executorService);
     return this;
   }
 
   @Override
   public BaseExpireSnapshotsSparkAction expireSnapshotId(long snapshotId) {
-    expiredSnapshotIds.add(snapshotId);
+    super.expireSnapshotId(snapshotId);
     return this;
   }
 
   @Override
   public BaseExpireSnapshotsSparkAction expireOlderThan(long timestampMillis) {
-    this.expireOlderThanValue = timestampMillis;
+    super.expireOlderThan(timestampMillis);
     return this;
   }
 
   @Override
   public BaseExpireSnapshotsSparkAction retainLast(int numSnapshots) {
-    Preconditions.checkArgument(1 <= numSnapshots,
-        "Number of snapshots to retain must be at least 1, cannot be: %s", numSnapshots);
-    this.retainLastValue = numSnapshots;
+    super.retainLast(numSnapshots);
     return this;
   }
 
   @Override
   public BaseExpireSnapshotsSparkAction deleteWith(Consumer<String> newDeleteFunc) {
-    this.deleteFunc = newDeleteFunc;
+    super.deleteWith(newDeleteFunc);
     return this;
   }
-
-  /**
-   * Expires snapshots and commits the changes to the table, returning a Dataset of files to delete.
-   * <p>
-   * This does not delete data files. To delete data files, run {@link #execute()}.
-   * <p>
-   * This may be called before or after {@link #execute()} is called to return the expired file list.
-   *
-   * @return a Dataset of files that are no longer referenced by the table
-   */
-  public Dataset<Row> expire() {
-    if (expiredFiles == null) {
-      // fetch metadata before expiration
-      Dataset<Row> originalFiles = buildValidFileDF(ops.current());
-
-      // perform expiration
-      org.apache.iceberg.ExpireSnapshots expireSnapshots = table.expireSnapshots().cleanExpiredFiles(false);
-      for (long id : expiredSnapshotIds) {
-        expireSnapshots = expireSnapshots.expireSnapshotId(id);
-      }
-
-      if (expireOlderThanValue != null) {
-        expireSnapshots = expireSnapshots.expireOlderThan(expireOlderThanValue);
-      }
-
-      if (retainLastValue != null) {
-        expireSnapshots = expireSnapshots.retainLast(retainLastValue);
-      }
-
-      expireSnapshots.commit();
-
-      // fetch metadata after expiration
-      Dataset<Row> validFiles = buildValidFileDF(ops.refresh());
-
-      // determine expired files
-      this.expiredFiles = originalFiles.except(validFiles);
-    }
-
-    return expiredFiles;
-  }
-
-  @Override
-  public ExpireSnapshots.Result execute() {
-    JobGroupInfo info = newJobGroupInfo("EXPIRE-SNAPSHOTS", jobDesc());
-    return withJobGroupInfo(info, this::doExecute);
-  }
-
-  private String jobDesc() {
-    List<String> options = Lists.newArrayList();
-
-    if (expireOlderThanValue != null) {
-      options.add("older_than=" + expireOlderThanValue);
-    }
-
-    if (retainLastValue != null) {
-      options.add("retain_last=" + retainLastValue);
-    }
-
-    if (!expiredSnapshotIds.isEmpty()) {
-      Long first = expiredSnapshotIds.stream().findFirst().get();
-      if (expiredSnapshotIds.size() > 1) {
-        options.add(String.format("snapshot_ids: %s (%s more...)", first, expiredSnapshotIds.size() - 1));
-      } else {
-        options.add(String.format("snapshot_id: %s", first));
-      }
-    }
-
-    return String.format("Expiring snapshots (%s) in %s", Joiner.on(',').join(options), table.name());
-  }
-
-  private ExpireSnapshots.Result doExecute() {
-    boolean streamResults = PropertyUtil.propertyAsBoolean(options(), STREAM_RESULTS, STREAM_RESULTS_DEFAULT);
-    if (streamResults) {
-      return deleteFiles(expire().toLocalIterator());
-    } else {
-      return deleteFiles(expire().collectAsList().iterator());
-    }
-  }
-
-  private Dataset<Row> buildValidFileDF(TableMetadata metadata) {
-    Table staticTable = newStaticTable(metadata, table.io());
-    return buildValidContentFileWithTypeDF(staticTable)
-        .union(withFileType(buildManifestFileDF(staticTable), MANIFEST))
-        .union(withFileType(buildManifestListDF(staticTable), MANIFEST_LIST));
-  }
-
-  /**
-   * Deletes files passed to it based on their type.
-   *
-   * @param expired an Iterator of Spark Rows of the structure (path: String, type: String)
-   * @return Statistics on which files were deleted
-   */
-  private BaseExpireSnapshotsActionResult deleteFiles(Iterator<Row> expired) {
-    AtomicLong dataFileCount = new AtomicLong(0L);
-    AtomicLong posDeleteFileCount = new AtomicLong(0L);
-    AtomicLong eqDeleteFileCount = new AtomicLong(0L);
-    AtomicLong manifestCount = new AtomicLong(0L);
-    AtomicLong manifestListCount = new AtomicLong(0L);
-
-    Tasks.foreach(expired)
-        .retry(3).stopRetryOn(NotFoundException.class).suppressFailureWhenFinished()
-        .executeWith(deleteExecutorService)
-        .onFailure((fileInfo, exc) -> {
-          String file = fileInfo.getString(0);
-          String type = fileInfo.getString(1);
-          LOG.warn("Delete failed for {}: {}", type, file, exc);
-        })
-        .run(fileInfo -> {
-          String file = fileInfo.getString(0);
-          String type = fileInfo.getString(1);
-          deleteFunc.accept(file);
-
-          if (FileContent.DATA.name().equalsIgnoreCase(type)) {
-            dataFileCount.incrementAndGet();
-            LOG.trace("Deleted Data File: {}", file);
-          } else if (FileContent.POSITION_DELETES.name().equalsIgnoreCase(type)) {
-            posDeleteFileCount.incrementAndGet();
-            LOG.trace("Deleted Positional Delete File: {}", file);
-          } else if (FileContent.EQUALITY_DELETES.name().equalsIgnoreCase(type)) {
-            eqDeleteFileCount.incrementAndGet();
-            LOG.trace("Deleted Equality Delete File: {}", file);
-          } else if (MANIFEST.equals(type)) {
-            manifestCount.incrementAndGet();
-            LOG.debug("Deleted Manifest: {}", file);
-          } else if (MANIFEST_LIST.equalsIgnoreCase(type)) {
-            manifestListCount.incrementAndGet();
-            LOG.debug("Deleted Manifest List: {}", file);
-          } else {
-            throw new ValidationException("Illegal file type: %s", type);
-          }
-        });
-
-    long contentFileCount = dataFileCount.get() + posDeleteFileCount.get() + eqDeleteFileCount.get();
-    LOG.info("Deleted {} total files", contentFileCount + manifestCount.get() + manifestListCount.get());
-
-    return new BaseExpireSnapshotsActionResult(dataFileCount.get(), posDeleteFileCount.get(),
-        eqDeleteFileCount.get(), manifestCount.get(), manifestListCount.get());
-  }
 }
diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseMigrateTableSparkAction.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseMigrateTableSparkAction.java
index ef9f0d3e25..78a6fa20f5 100644
--- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseMigrateTableSparkAction.java
+++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseMigrateTableSparkAction.java
@@ -19,196 +19,19 @@
 
 package org.apache.iceberg.spark.actions;
 
-import java.util.Map;
-import org.apache.iceberg.Snapshot;
-import org.apache.iceberg.SnapshotSummary;
-import org.apache.iceberg.Table;
-import org.apache.iceberg.actions.BaseMigrateTableActionResult;
-import org.apache.iceberg.actions.MigrateTable;
-import org.apache.iceberg.exceptions.AlreadyExistsException;
-import org.apache.iceberg.exceptions.NoSuchTableException;
-import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
-import org.apache.iceberg.relocated.com.google.common.collect.Maps;
-import org.apache.iceberg.spark.JobGroupInfo;
-import org.apache.iceberg.spark.SparkSessionCatalog;
-import org.apache.iceberg.spark.SparkTableUtil;
-import org.apache.iceberg.spark.source.StagedSparkTable;
 import org.apache.spark.sql.SparkSession;
-import org.apache.spark.sql.catalyst.TableIdentifier;
 import org.apache.spark.sql.connector.catalog.CatalogPlugin;
 import org.apache.spark.sql.connector.catalog.Identifier;
-import org.apache.spark.sql.connector.catalog.StagingTableCatalog;
-import org.apache.spark.sql.connector.catalog.TableCatalog;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.Some;
-import scala.collection.JavaConverters;
 
 /**
- * Takes a Spark table in the source catalog and attempts to transform it into an Iceberg
- * table in the same location with the same identifier. Once complete the identifier which
- * previously referred to a non-Iceberg table will refer to the newly migrated Iceberg
- * table.
+ * An action to migrate a table to Iceberg.
+ *
+ * @deprecated since 0.14.0, will be removed in 1.0.0;
+ *             use {@link SparkActions} and {@link MigrateTableSparkAction} instead.
  */
-public class BaseMigrateTableSparkAction
-    extends BaseTableCreationSparkAction<MigrateTable, MigrateTable.Result>
-    implements MigrateTable {
-
-  private static final Logger LOG = LoggerFactory.getLogger(BaseMigrateTableSparkAction.class);
-  private static final String BACKUP_SUFFIX = "_BACKUP_";
-
-  private final StagingTableCatalog destCatalog;
-  private final Identifier destTableIdent;
-  private final Identifier backupIdent;
-
+@Deprecated
+public class BaseMigrateTableSparkAction extends MigrateTableSparkAction {
   public BaseMigrateTableSparkAction(SparkSession spark, CatalogPlugin sourceCatalog, Identifier sourceTableIdent) {
     super(spark, sourceCatalog, sourceTableIdent);
-    this.destCatalog = checkDestinationCatalog(sourceCatalog);
-    this.destTableIdent = sourceTableIdent;
-    String backupName = sourceTableIdent.name() + BACKUP_SUFFIX;
-    this.backupIdent = Identifier.of(sourceTableIdent.namespace(), backupName);
-  }
-
-  @Override
-  protected MigrateTable self() {
-    return this;
-  }
-
-  @Override
-  protected StagingTableCatalog destCatalog() {
-    return destCatalog;
-  }
-
-  @Override
-  protected Identifier destTableIdent() {
-    return destTableIdent;
-  }
-
-  @Override
-  public MigrateTable tableProperties(Map<String, String> properties) {
-    setProperties(properties);
-    return this;
-  }
-
-  @Override
-  public MigrateTable tableProperty(String property, String value) {
-    setProperty(property, value);
-    return this;
-  }
-
-  @Override
-  public MigrateTable.Result execute() {
-    String desc = String.format("Migrating table %s", destTableIdent().toString());
-    JobGroupInfo info = newJobGroupInfo("MIGRATE-TABLE", desc);
-    return withJobGroupInfo(info, this::doExecute);
-  }
-
-  private MigrateTable.Result doExecute() {
-    LOG.info("Starting the migration of {} to Iceberg", sourceTableIdent());
-
-    // move the source table to a new name, halting all modifications and allowing us to stage
-    // the creation of a new Iceberg table in its place
-    renameAndBackupSourceTable();
-
-    StagedSparkTable stagedTable = null;
-    Table icebergTable;
-    boolean threw = true;
-    try {
-      LOG.info("Staging a new Iceberg table {}", destTableIdent());
-      stagedTable = stageDestTable();
-      icebergTable = stagedTable.table();
-
-      LOG.info("Ensuring {} has a valid name mapping", destTableIdent());
-      ensureNameMappingPresent(icebergTable);
-
-      Some<String> backupNamespace = Some.apply(backupIdent.namespace()[0]);
-      TableIdentifier v1BackupIdent = new TableIdentifier(backupIdent.name(), backupNamespace);
-      String stagingLocation = getMetadataLocation(icebergTable);
-      LOG.info("Generating Iceberg metadata for {} in {}", destTableIdent(), stagingLocation);
-      SparkTableUtil.importSparkTable(spark(), v1BackupIdent, icebergTable, stagingLocation);
-
-      LOG.info("Committing staged changes to {}", destTableIdent());
-      stagedTable.commitStagedChanges();
-      threw = false;
-    } finally {
-      if (threw) {
-        LOG.error("Failed to perform the migration, aborting table creation and restoring the original table");
-
-        restoreSourceTable();
-
-        if (stagedTable != null) {
-          try {
-            stagedTable.abortStagedChanges();
-          } catch (Exception abortException) {
-            LOG.error("Cannot abort staged changes", abortException);
-          }
-        }
-      }
-    }
-
-    Snapshot snapshot = icebergTable.currentSnapshot();
-    long migratedDataFilesCount = Long.parseLong(snapshot.summary().get(SnapshotSummary.TOTAL_DATA_FILES_PROP));
-    LOG.info("Successfully loaded Iceberg metadata for {} files to {}", migratedDataFilesCount, destTableIdent());
-    return new BaseMigrateTableActionResult(migratedDataFilesCount);
-  }
-
-  @Override
-  protected Map<String, String> destTableProps() {
-    Map<String, String> properties = Maps.newHashMap();
-
-    // copy over relevant source table props
-    properties.putAll(JavaConverters.mapAsJavaMapConverter(v1SourceTable().properties()).asJava());
-    EXCLUDED_PROPERTIES.forEach(properties::remove);
-
-    // set default and user-provided props
-    properties.put(TableCatalog.PROP_PROVIDER, "iceberg");
-    properties.putAll(additionalProperties());
-
-    // make sure we mark this table as migrated
-    properties.put("migrated", "true");
-
-    // inherit the source table location
-    properties.putIfAbsent(LOCATION, sourceTableLocation());
-
-    return properties;
-  }
-
-  @Override
-  protected TableCatalog checkSourceCatalog(CatalogPlugin catalog) {
-    // currently the import code relies on being able to look up the table in the session catalog
-    Preconditions.checkArgument(catalog instanceof SparkSessionCatalog,
-        "Cannot migrate a table from a non-Iceberg Spark Session Catalog. Found %s of class %s as the source catalog.",
-        catalog.name(), catalog.getClass().getName());
-
-    return (TableCatalog) catalog;
-  }
-
-  private void renameAndBackupSourceTable() {
-    try {
-      LOG.info("Renaming {} as {} for backup", sourceTableIdent(), backupIdent);
-      destCatalog().renameTable(sourceTableIdent(), backupIdent);
-
-    } catch (org.apache.spark.sql.catalyst.analysis.NoSuchTableException e) {
-      throw new NoSuchTableException("Cannot find source table %s", sourceTableIdent());
-
-    } catch (org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException e) {
-      throw new AlreadyExistsException(
-          "Cannot rename %s as %s for backup. The backup table already exists.",
-          sourceTableIdent(), backupIdent);
-    }
-  }
-
-  private void restoreSourceTable() {
-    try {
-      LOG.info("Restoring {} from {}", sourceTableIdent(), backupIdent);
-      destCatalog().renameTable(backupIdent, sourceTableIdent());
-
-    } catch (org.apache.spark.sql.catalyst.analysis.NoSuchTableException e) {
-      LOG.error("Cannot restore the original table, the backup table {} cannot be found", backupIdent, e);
-
-    } catch (org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException e) {
-      LOG.error("Cannot restore the original table, a table with the original name exists. " +
-          "Use the backup table {} to restore the original table manually.", backupIdent, e);
-    }
   }
 }
diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteDataFilesSparkAction.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteDataFilesSparkAction.java
index d1937fcbcd..d23a367fbd 100644
--- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteDataFilesSparkAction.java
+++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteDataFilesSparkAction.java
@@ -19,459 +19,18 @@
 
 package org.apache.iceberg.spark.actions;
 
-import java.io.IOException;
-import java.math.RoundingMode;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
-import org.apache.iceberg.DataFile;
-import org.apache.iceberg.FileScanTask;
-import org.apache.iceberg.RewriteJobOrder;
-import org.apache.iceberg.SortOrder;
-import org.apache.iceberg.StructLike;
 import org.apache.iceberg.Table;
-import org.apache.iceberg.actions.BaseRewriteDataFilesFileGroupInfo;
-import org.apache.iceberg.actions.BaseRewriteDataFilesResult;
-import org.apache.iceberg.actions.BinPackStrategy;
-import org.apache.iceberg.actions.RewriteDataFiles;
-import org.apache.iceberg.actions.RewriteDataFilesCommitManager;
-import org.apache.iceberg.actions.RewriteFileGroup;
-import org.apache.iceberg.actions.RewriteStrategy;
-import org.apache.iceberg.actions.SortStrategy;
-import org.apache.iceberg.data.GenericRecord;
-import org.apache.iceberg.exceptions.CommitFailedException;
-import org.apache.iceberg.exceptions.ValidationException;
-import org.apache.iceberg.expressions.Expression;
-import org.apache.iceberg.expressions.Expressions;
-import org.apache.iceberg.io.CloseableIterable;
-import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
-import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
-import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
-import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
-import org.apache.iceberg.relocated.com.google.common.collect.Lists;
-import org.apache.iceberg.relocated.com.google.common.collect.Maps;
-import org.apache.iceberg.relocated.com.google.common.collect.Queues;
-import org.apache.iceberg.relocated.com.google.common.collect.Sets;
-import org.apache.iceberg.relocated.com.google.common.math.IntMath;
-import org.apache.iceberg.relocated.com.google.common.util.concurrent.MoreExecutors;
-import org.apache.iceberg.relocated.com.google.common.util.concurrent.ThreadFactoryBuilder;
-import org.apache.iceberg.types.Types.StructType;
-import org.apache.iceberg.util.PropertyUtil;
-import org.apache.iceberg.util.StructLikeMap;
-import org.apache.iceberg.util.Tasks;
 import org.apache.spark.sql.SparkSession;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class BaseRewriteDataFilesSparkAction
-    extends BaseSnapshotUpdateSparkAction<RewriteDataFiles, RewriteDataFiles.Result> implements RewriteDataFiles {
-
-  private static final Logger LOG = LoggerFactory.getLogger(BaseRewriteDataFilesSparkAction.class);
-  private static final Set<String> VALID_OPTIONS = ImmutableSet.of(
-      MAX_CONCURRENT_FILE_GROUP_REWRITES,
-      MAX_FILE_GROUP_SIZE_BYTES,
-      PARTIAL_PROGRESS_ENABLED,
-      PARTIAL_PROGRESS_MAX_COMMITS,
-      TARGET_FILE_SIZE_BYTES,
-      USE_STARTING_SEQUENCE_NUMBER,
-      REWRITE_JOB_ORDER
-  );
-
-  private final Table table;
-
-  private Expression filter = Expressions.alwaysTrue();
-  private int maxConcurrentFileGroupRewrites;
-  private int maxCommits;
-  private boolean partialProgressEnabled;
-  private boolean useStartingSequenceNumber;
-  private RewriteJobOrder rewriteJobOrder;
-  private RewriteStrategy strategy = null;
 
+/**
+ * An action to rewrite data files.
+ *
+ * @deprecated since 0.14.0, will be removed in 1.0.0;
+ *             use {@link SparkActions} and {@link RewriteDataFilesSparkAction} instead.
+ */
+@Deprecated
+public class BaseRewriteDataFilesSparkAction extends RewriteDataFilesSparkAction {
   protected BaseRewriteDataFilesSparkAction(SparkSession spark, Table table) {
-    super(spark);
-    this.table = table;
-  }
-
-  @Override
-  protected RewriteDataFiles self() {
-    return this;
-  }
-
-  @Override
-  public RewriteDataFiles binPack() {
-    Preconditions.checkArgument(this.strategy == null,
-        "Cannot set strategy to binpack, it has already been set", this.strategy);
-    this.strategy = binPackStrategy();
-    return this;
-  }
-
-  @Override
-  public RewriteDataFiles sort(SortOrder sortOrder) {
-    Preconditions.checkArgument(this.strategy == null,
-        "Cannot set strategy to sort, it has already been set to %s", this.strategy);
-    this.strategy = sortStrategy().sortOrder(sortOrder);
-    return this;
-  }
-
-  @Override
-  public RewriteDataFiles sort() {
-    Preconditions.checkArgument(this.strategy == null,
-        "Cannot set strategy to sort, it has already been set to %s", this.strategy);
-    this.strategy = sortStrategy();
-    return this;
-  }
-
-  @Override
-  public RewriteDataFiles zOrder(String... columnNames) {
-    Preconditions.checkArgument(this.strategy == null,
-        "Cannot set strategy to zorder, it has already been set to %s", this.strategy);
-    this.strategy = zOrderStrategy(columnNames);
-    return this;
-  }
-
-  @Override
-  public RewriteDataFiles filter(Expression expression) {
-    filter = Expressions.and(filter, expression);
-    return this;
-  }
-
-  @Override
-  public RewriteDataFiles.Result execute() {
-    if (table.currentSnapshot() == null) {
-      return new BaseRewriteDataFilesResult(ImmutableList.of());
-    }
-
-    long startingSnapshotId = table.currentSnapshot().snapshotId();
-
-    // Default to BinPack if no strategy selected
-    if (this.strategy == null) {
-      this.strategy = binPackStrategy();
-    }
-
-    validateAndInitOptions();
-
-    Map<StructLike, List<List<FileScanTask>>> fileGroupsByPartition = planFileGroups(startingSnapshotId);
-    RewriteExecutionContext ctx = new RewriteExecutionContext(fileGroupsByPartition);
-
-    if (ctx.totalGroupCount() == 0) {
-      LOG.info("Nothing found to rewrite in {}", table.name());
-      return new BaseRewriteDataFilesResult(Collections.emptyList());
-    }
-
-    Stream<RewriteFileGroup> groupStream = toGroupStream(ctx, fileGroupsByPartition);
-
-    RewriteDataFilesCommitManager commitManager = commitManager(startingSnapshotId);
-    if (partialProgressEnabled) {
-      return doExecuteWithPartialProgress(ctx, groupStream, commitManager);
-    } else {
-      return doExecute(ctx, groupStream, commitManager);
-    }
-  }
-
-  Map<StructLike, List<List<FileScanTask>>> planFileGroups(long startingSnapshotId) {
-    CloseableIterable<FileScanTask> fileScanTasks = table.newScan()
-        .useSnapshot(startingSnapshotId)
-        .filter(filter)
-        .ignoreResiduals()
-        .planFiles();
-
-    try {
-      StructType partitionType = table.spec().partitionType();
-      StructLikeMap<List<FileScanTask>> filesByPartition = StructLikeMap.create(partitionType);
-      StructLike emptyStruct = GenericRecord.create(partitionType);
-
-      fileScanTasks.forEach(task -> {
-        // If a task uses an incompatible partition spec the data inside could contain values which
-        // belong to multiple partitions in the current spec. Treating all such files as un-partitioned and
-        // grouping them together helps to minimize new files made.
-        StructLike taskPartition = task.file().specId() == table.spec().specId() ?
-            task.file().partition() : emptyStruct;
-
-        List<FileScanTask> files = filesByPartition.get(taskPartition);
-        if (files == null) {
-          files = Lists.newArrayList();
-        }
-
-        files.add(task);
-        filesByPartition.put(taskPartition, files);
-      });
-
-      StructLikeMap<List<List<FileScanTask>>> fileGroupsByPartition = StructLikeMap.create(partitionType);
-
-      filesByPartition.forEach((partition, tasks) -> {
-        Iterable<FileScanTask> filtered = strategy.selectFilesToRewrite(tasks);
-        Iterable<List<FileScanTask>> groupedTasks = strategy.planFileGroups(filtered);
-        List<List<FileScanTask>> fileGroups = ImmutableList.copyOf(groupedTasks);
-        if (fileGroups.size() > 0) {
-          fileGroupsByPartition.put(partition, fileGroups);
-        }
-      });
-
-      return fileGroupsByPartition;
-    } finally {
-      try {
-        fileScanTasks.close();
-      } catch (IOException io) {
-        LOG.error("Cannot properly close file iterable while planning for rewrite", io);
-      }
-    }
-  }
-
-  @VisibleForTesting
-  RewriteFileGroup rewriteFiles(RewriteExecutionContext ctx, RewriteFileGroup fileGroup) {
-    String desc = jobDesc(fileGroup, ctx);
-    Set<DataFile> addedFiles = withJobGroupInfo(
-        newJobGroupInfo("REWRITE-DATA-FILES", desc),
-        () -> strategy.rewriteFiles(fileGroup.fileScans()));
-
-    fileGroup.setOutputFiles(addedFiles);
-    LOG.info("Rewrite Files Ready to be Committed - {}", desc);
-    return fileGroup;
-  }
-
-  private ExecutorService rewriteService() {
-    return MoreExecutors.getExitingExecutorService(
-        (ThreadPoolExecutor) Executors.newFixedThreadPool(
-            maxConcurrentFileGroupRewrites,
-            new ThreadFactoryBuilder()
-                .setNameFormat("Rewrite-Service-%d")
-                .build()));
-  }
-
-  @VisibleForTesting
-  RewriteDataFilesCommitManager commitManager(long startingSnapshotId) {
-    return new RewriteDataFilesCommitManager(table, startingSnapshotId, useStartingSequenceNumber);
-  }
-
-  private Result doExecute(RewriteExecutionContext ctx, Stream<RewriteFileGroup> groupStream,
-                           RewriteDataFilesCommitManager commitManager) {
-    ExecutorService rewriteService = rewriteService();
-
-    ConcurrentLinkedQueue<RewriteFileGroup> rewrittenGroups = Queues.newConcurrentLinkedQueue();
-
-    Tasks.Builder<RewriteFileGroup> rewriteTaskBuilder = Tasks.foreach(groupStream)
-        .executeWith(rewriteService)
-        .stopOnFailure()
-        .noRetry()
-        .onFailure((fileGroup, exception) -> {
-          LOG.warn("Failure during rewrite process for group {}", fileGroup.info(), exception);
-        });
-
-    try {
-      rewriteTaskBuilder.run(fileGroup -> {
-        rewrittenGroups.add(rewriteFiles(ctx, fileGroup));
-      });
-    } catch (Exception e) {
-      // At least one rewrite group failed, clean up all completed rewrites
-      LOG.error("Cannot complete rewrite, {} is not enabled and one of the file set groups failed to " +
-          "be rewritten. This error occurred during the writing of new files, not during the commit process. This " +
-          "indicates something is wrong that doesn't involve conflicts with other Iceberg operations. Enabling " +
-          "{} may help in this case but the root cause should be investigated. Cleaning up {} groups which finished " +
-          "being written.", PARTIAL_PROGRESS_ENABLED, PARTIAL_PROGRESS_ENABLED, rewrittenGroups.size(), e);
-
-      Tasks.foreach(rewrittenGroups)
-          .suppressFailureWhenFinished()
-          .run(group -> commitManager.abortFileGroup(group));
-      throw e;
-    } finally {
-      rewriteService.shutdown();
-    }
-
-    try {
-      commitManager.commitOrClean(Sets.newHashSet(rewrittenGroups));
-    } catch (ValidationException | CommitFailedException e) {
-      String errorMessage = String.format(
-          "Cannot commit rewrite because of a ValidationException or CommitFailedException. This usually means that " +
-              "this rewrite has conflicted with another concurrent Iceberg operation. To reduce the likelihood of " +
-              "conflicts, set %s which will break up the rewrite into multiple smaller commits controlled by %s. " +
-              "Separate smaller rewrite commits can succeed independently while any commits that conflict with " +
-              "another Iceberg operation will be ignored. This mode will create additional snapshots in the table " +
-              "history, one for each commit.",
-          PARTIAL_PROGRESS_ENABLED, PARTIAL_PROGRESS_MAX_COMMITS);
-      throw new RuntimeException(errorMessage, e);
-    }
-
-    List<FileGroupRewriteResult> rewriteResults = rewrittenGroups.stream()
-        .map(RewriteFileGroup::asResult)
-        .collect(Collectors.toList());
-    return new BaseRewriteDataFilesResult(rewriteResults);
-  }
-
-  private Result doExecuteWithPartialProgress(RewriteExecutionContext ctx, Stream<RewriteFileGroup> groupStream,
-                                              RewriteDataFilesCommitManager commitManager) {
-    ExecutorService rewriteService = rewriteService();
-
-    // Start Commit Service
-    int groupsPerCommit = IntMath.divide(ctx.totalGroupCount(), maxCommits, RoundingMode.CEILING);
-    RewriteDataFilesCommitManager.CommitService commitService = commitManager.service(groupsPerCommit);
-    commitService.start();
-
-    // Start rewrite tasks
-    Tasks.foreach(groupStream)
-        .suppressFailureWhenFinished()
-        .executeWith(rewriteService)
-        .noRetry()
-        .onFailure((fileGroup, exception) -> LOG.error("Failure during rewrite group {}", fileGroup.info(), exception))
-        .run(fileGroup -> commitService.offer(rewriteFiles(ctx, fileGroup)));
-    rewriteService.shutdown();
-
-    // Stop Commit service
-    commitService.close();
-    List<RewriteFileGroup> commitResults = commitService.results();
-    if (commitResults.size() == 0) {
-      LOG.error("{} is true but no rewrite commits succeeded. Check the logs to determine why the individual " +
-          "commits failed. If this is persistent it may help to increase {} which will break the rewrite operation " +
-          "into smaller commits.", PARTIAL_PROGRESS_ENABLED, PARTIAL_PROGRESS_MAX_COMMITS);
-    }
-
-    List<FileGroupRewriteResult> rewriteResults = commitResults.stream()
-        .map(RewriteFileGroup::asResult)
-        .collect(Collectors.toList());
-    return new BaseRewriteDataFilesResult(rewriteResults);
-  }
-
-  Stream<RewriteFileGroup> toGroupStream(RewriteExecutionContext ctx,
-      Map<StructLike, List<List<FileScanTask>>> fileGroupsByPartition) {
-    Stream<RewriteFileGroup> rewriteFileGroupStream = fileGroupsByPartition.entrySet().stream()
-        .flatMap(e -> {
-          StructLike partition = e.getKey();
-          List<List<FileScanTask>> fileGroups = e.getValue();
-          return fileGroups.stream().map(tasks -> {
-            int globalIndex = ctx.currentGlobalIndex();
-            int partitionIndex = ctx.currentPartitionIndex(partition);
-            FileGroupInfo info = new BaseRewriteDataFilesFileGroupInfo(globalIndex, partitionIndex, partition);
-            return new RewriteFileGroup(info, tasks);
-          });
-        });
-
-    return rewriteFileGroupStream.sorted(rewriteGroupComparator());
-  }
-
-  private Comparator<RewriteFileGroup> rewriteGroupComparator() {
-    switch (rewriteJobOrder) {
-      case BYTES_ASC:
-        return Comparator.comparing(RewriteFileGroup::sizeInBytes);
-      case BYTES_DESC:
-        return Comparator.comparing(RewriteFileGroup::sizeInBytes, Comparator.reverseOrder());
-      case FILES_ASC:
-        return Comparator.comparing(RewriteFileGroup::numFiles);
-      case FILES_DESC:
-        return Comparator.comparing(RewriteFileGroup::numFiles, Comparator.reverseOrder());
-      default:
-        return (fileGroupOne, fileGroupTwo) -> 0;
-    }
-  }
-
-  void validateAndInitOptions() {
-    Set<String> validOptions = Sets.newHashSet(strategy.validOptions());
-    validOptions.addAll(VALID_OPTIONS);
-
-    Set<String> invalidKeys = Sets.newHashSet(options().keySet());
-    invalidKeys.removeAll(validOptions);
-
-    Preconditions.checkArgument(invalidKeys.isEmpty(),
-        "Cannot use options %s, they are not supported by the action or the strategy %s",
-        invalidKeys, strategy.name());
-
-    strategy = strategy.options(options());
-
-    maxConcurrentFileGroupRewrites = PropertyUtil.propertyAsInt(options(),
-        MAX_CONCURRENT_FILE_GROUP_REWRITES,
-        MAX_CONCURRENT_FILE_GROUP_REWRITES_DEFAULT);
-
-    maxCommits = PropertyUtil.propertyAsInt(options(),
-        PARTIAL_PROGRESS_MAX_COMMITS,
-        PARTIAL_PROGRESS_MAX_COMMITS_DEFAULT);
-
-    partialProgressEnabled = PropertyUtil.propertyAsBoolean(options(),
-        PARTIAL_PROGRESS_ENABLED,
-        PARTIAL_PROGRESS_ENABLED_DEFAULT);
-
-    useStartingSequenceNumber = PropertyUtil.propertyAsBoolean(options(),
-        USE_STARTING_SEQUENCE_NUMBER,
-        USE_STARTING_SEQUENCE_NUMBER_DEFAULT);
-
-    rewriteJobOrder = RewriteJobOrder.fromName(PropertyUtil.propertyAsString(options(),
-        REWRITE_JOB_ORDER,
-        REWRITE_JOB_ORDER_DEFAULT));
-
-    Preconditions.checkArgument(maxConcurrentFileGroupRewrites >= 1,
-        "Cannot set %s to %s, the value must be positive.",
-        MAX_CONCURRENT_FILE_GROUP_REWRITES, maxConcurrentFileGroupRewrites);
-
-    Preconditions.checkArgument(!partialProgressEnabled || maxCommits > 0,
-        "Cannot set %s to %s, the value must be positive when %s is true",
-        PARTIAL_PROGRESS_MAX_COMMITS, maxCommits, PARTIAL_PROGRESS_ENABLED);
-  }
-
-  private String jobDesc(RewriteFileGroup group, RewriteExecutionContext ctx) {
-    StructLike partition = group.info().partition();
-    if (partition.size() > 0) {
-      return String.format("Rewriting %d files (%s, file group %d/%d, %s (%d/%d)) in %s",
-          group.rewrittenFiles().size(),
-          strategy.name(), group.info().globalIndex(),
-          ctx.totalGroupCount(), partition, group.info().partitionIndex(), ctx.groupsInPartition(partition),
-          table.name());
-    } else {
-      return String.format("Rewriting %d files (%s, file group %d/%d) in %s",
-          group.rewrittenFiles().size(),
-          strategy.name(), group.info().globalIndex(), ctx.totalGroupCount(),
-          table.name());
-    }
-  }
-
-  private BinPackStrategy binPackStrategy() {
-    return new SparkBinPackStrategy(table, spark());
-  }
-
-  private SortStrategy sortStrategy() {
-    return new SparkSortStrategy(table, spark());
-  }
-
-  private SortStrategy zOrderStrategy(String... columnNames) {
-    return new SparkZOrderStrategy(table, spark(), Lists.newArrayList(columnNames));
-  }
-
-  @VisibleForTesting
-  static class RewriteExecutionContext {
-    private final Map<StructLike, Integer> numGroupsByPartition;
-    private final int totalGroupCount;
-    private final Map<StructLike, Integer> partitionIndexMap;
-    private final AtomicInteger groupIndex;
-
-    RewriteExecutionContext(Map<StructLike, List<List<FileScanTask>>> fileGroupsByPartition) {
-      this.numGroupsByPartition = fileGroupsByPartition.entrySet().stream()
-          .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().size()));
-      this.totalGroupCount = numGroupsByPartition.values().stream()
-          .reduce(Integer::sum)
-          .orElse(0);
-      this.partitionIndexMap = Maps.newConcurrentMap();
-      this.groupIndex = new AtomicInteger(1);
-    }
-
-    public int currentGlobalIndex() {
-      return groupIndex.getAndIncrement();
-    }
-
-    public int currentPartitionIndex(StructLike partition) {
-      return partitionIndexMap.merge(partition, 1, Integer::sum);
-    }
-
-    public int groupsInPartition(StructLike partition) {
-      return numGroupsByPartition.get(partition);
-    }
-
-    public int totalGroupCount() {
-      return totalGroupCount;
-    }
+    super(spark, table);
   }
 }
diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteManifestsSparkAction.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteManifestsSparkAction.java
index 1648dd26f8..d570397fc5 100644
--- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteManifestsSparkAction.java
+++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteManifestsSparkAction.java
@@ -19,352 +19,18 @@
 
 package org.apache.iceberg.spark.actions;
 
-import java.io.IOException;
-import java.util.Collections;
-import java.util.List;
-import java.util.UUID;
-import java.util.function.Function;
-import java.util.function.Predicate;
-import java.util.stream.Collectors;
-import org.apache.hadoop.fs.Path;
-import org.apache.iceberg.DataFile;
-import org.apache.iceberg.FileFormat;
-import org.apache.iceberg.HasTableOperations;
-import org.apache.iceberg.ManifestFile;
-import org.apache.iceberg.ManifestFiles;
-import org.apache.iceberg.ManifestWriter;
-import org.apache.iceberg.PartitionSpec;
-import org.apache.iceberg.Snapshot;
 import org.apache.iceberg.Table;
-import org.apache.iceberg.TableOperations;
-import org.apache.iceberg.TableProperties;
-import org.apache.iceberg.actions.BaseRewriteManifestsActionResult;
-import org.apache.iceberg.actions.RewriteManifests;
-import org.apache.iceberg.exceptions.CommitStateUnknownException;
-import org.apache.iceberg.exceptions.ValidationException;
-import org.apache.iceberg.io.FileIO;
-import org.apache.iceberg.io.OutputFile;
-import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
-import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
-import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
-import org.apache.iceberg.relocated.com.google.common.collect.Lists;
-import org.apache.iceberg.spark.JobGroupInfo;
-import org.apache.iceberg.spark.SparkDataFile;
-import org.apache.iceberg.spark.SparkUtil;
-import org.apache.iceberg.types.Types;
-import org.apache.iceberg.util.PropertyUtil;
-import org.apache.iceberg.util.Tasks;
-import org.apache.iceberg.util.ThreadPools;
-import org.apache.spark.api.java.function.MapFunction;
-import org.apache.spark.api.java.function.MapPartitionsFunction;
-import org.apache.spark.broadcast.Broadcast;
-import org.apache.spark.sql.Column;
-import org.apache.spark.sql.Dataset;
-import org.apache.spark.sql.Encoder;
-import org.apache.spark.sql.Encoders;
-import org.apache.spark.sql.Row;
 import org.apache.spark.sql.SparkSession;
-import org.apache.spark.sql.internal.SQLConf;
-import org.apache.spark.sql.types.StructType;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static org.apache.iceberg.MetadataTableType.ENTRIES;
 
 /**
- * An action that rewrites manifests in a distributed manner and co-locates metadata for partitions.
- * <p>
- * By default, this action rewrites all manifests for the current partition spec and writes the result
- * to the metadata folder. The behavior can be modified by passing a custom predicate to {@link #rewriteIf(Predicate)}
- * and a custom spec id to {@link #specId(int)}. In addition, there is a way to configure a custom location
- * for new manifests via {@link #stagingLocation}.
+ * An action to rewrite manifests.
+ *
+ * @deprecated since 0.14.0, will be removed in 1.0.0;
+ *             use {@link SparkActions} and {@link RewriteManifestsSparkAction} instead.
  */
-public class BaseRewriteManifestsSparkAction
-    extends BaseSnapshotUpdateSparkAction<RewriteManifests, RewriteManifests.Result>
-    implements RewriteManifests {
-
-  private static final Logger LOG = LoggerFactory.getLogger(BaseRewriteManifestsSparkAction.class);
-
-  private static final String USE_CACHING = "use-caching";
-  private static final boolean USE_CACHING_DEFAULT = true;
-
-  private final Encoder<ManifestFile> manifestEncoder;
-  private final Table table;
-  private final int formatVersion;
-  private final FileIO fileIO;
-  private final long targetManifestSizeBytes;
-
-  private PartitionSpec spec = null;
-  private Predicate<ManifestFile> predicate = manifest -> true;
-  private String stagingLocation = null;
-
+@Deprecated
+public class BaseRewriteManifestsSparkAction extends RewriteManifestsSparkAction {
   public BaseRewriteManifestsSparkAction(SparkSession spark, Table table) {
-    super(spark);
-    this.manifestEncoder = Encoders.javaSerialization(ManifestFile.class);
-    this.table = table;
-    this.spec = table.spec();
-    this.targetManifestSizeBytes = PropertyUtil.propertyAsLong(
-        table.properties(),
-        TableProperties.MANIFEST_TARGET_SIZE_BYTES,
-        TableProperties.MANIFEST_TARGET_SIZE_BYTES_DEFAULT);
-    this.fileIO = SparkUtil.serializableFileIO(table);
-
-    // default the staging location to the metadata location
-    TableOperations ops = ((HasTableOperations) table).operations();
-    Path metadataFilePath = new Path(ops.metadataFileLocation("file"));
-    this.stagingLocation = metadataFilePath.getParent().toString();
-
-    // use the current table format version for new manifests
-    this.formatVersion = ops.current().formatVersion();
-  }
-
-  @Override
-  protected RewriteManifests self() {
-    return this;
-  }
-
-  @Override
-  public RewriteManifests specId(int specId) {
-    Preconditions.checkArgument(table.specs().containsKey(specId), "Invalid spec id %s", specId);
-    this.spec = table.specs().get(specId);
-    return this;
-  }
-
-  @Override
-  public RewriteManifests rewriteIf(Predicate<ManifestFile> newPredicate) {
-    this.predicate = newPredicate;
-    return this;
-  }
-
-  @Override
-  public RewriteManifests stagingLocation(String newStagingLocation) {
-    this.stagingLocation = newStagingLocation;
-    return this;
-  }
-
-  @Override
-  public RewriteManifests.Result execute() {
-    String desc = String.format("Rewriting manifests (staging location=%s) of %s", stagingLocation, table.name());
-    JobGroupInfo info = newJobGroupInfo("REWRITE-MANIFESTS", desc);
-    return withJobGroupInfo(info, this::doExecute);
-  }
-
-  private RewriteManifests.Result doExecute() {
-    List<ManifestFile> matchingManifests = findMatchingManifests();
-    if (matchingManifests.isEmpty()) {
-      return BaseRewriteManifestsActionResult.empty();
-    }
-
-    long totalSizeBytes = 0L;
-    int numEntries = 0;
-
-    for (ManifestFile manifest : matchingManifests) {
-      ValidationException.check(hasFileCounts(manifest), "No file counts in manifest: %s", manifest.path());
-
-      totalSizeBytes += manifest.length();
-      numEntries += manifest.addedFilesCount() + manifest.existingFilesCount() + manifest.deletedFilesCount();
-    }
-
-    int targetNumManifests = targetNumManifests(totalSizeBytes);
-    int targetNumManifestEntries = targetNumManifestEntries(numEntries, targetNumManifests);
-
-    Dataset<Row> manifestEntryDF = buildManifestEntryDF(matchingManifests);
-
-    List<ManifestFile> newManifests;
-    if (spec.fields().size() < 1) {
-      newManifests = writeManifestsForUnpartitionedTable(manifestEntryDF, targetNumManifests);
-    } else {
-      newManifests = writeManifestsForPartitionedTable(manifestEntryDF, targetNumManifests, targetNumManifestEntries);
-    }
-
-    replaceManifests(matchingManifests, newManifests);
-
-    return new BaseRewriteManifestsActionResult(matchingManifests, newManifests);
-  }
-
-  private Dataset<Row> buildManifestEntryDF(List<ManifestFile> manifests) {
-    Dataset<Row> manifestDF = spark()
-        .createDataset(Lists.transform(manifests, ManifestFile::path), Encoders.STRING())
-        .toDF("manifest");
-
-    Dataset<Row> manifestEntryDF = loadMetadataTable(table, ENTRIES)
-        .filter("status < 2") // select only live entries
-        .selectExpr("input_file_name() as manifest", "snapshot_id", "sequence_number", "data_file");
-
-    Column joinCond = manifestDF.col("manifest").equalTo(manifestEntryDF.col("manifest"));
-    return manifestEntryDF
-        .join(manifestDF, joinCond, "left_semi")
-        .select("snapshot_id", "sequence_number", "data_file");
-  }
-
-  private List<ManifestFile> writeManifestsForUnpartitionedTable(Dataset<Row> manifestEntryDF, int numManifests) {
-    Broadcast<FileIO> io = sparkContext().broadcast(fileIO);
-    StructType sparkType = (StructType) manifestEntryDF.schema().apply("data_file").dataType();
-
-    // we rely only on the target number of manifests for unpartitioned tables
-    // as we should not worry about having too much metadata per partition
-    long maxNumManifestEntries = Long.MAX_VALUE;
-
-    return manifestEntryDF
-        .repartition(numManifests)
-        .mapPartitions(
-            toManifests(io, maxNumManifestEntries, stagingLocation, formatVersion, spec, sparkType),
-            manifestEncoder
-        )
-        .collectAsList();
-  }
-
-  private List<ManifestFile> writeManifestsForPartitionedTable(
-      Dataset<Row> manifestEntryDF, int numManifests,
-      int targetNumManifestEntries) {
-
-    Broadcast<FileIO> io = sparkContext().broadcast(fileIO);
-    StructType sparkType = (StructType) manifestEntryDF.schema().apply("data_file").dataType();
-
-    // we allow the actual size of manifests to be 10% higher if the estimation is not precise enough
-    long maxNumManifestEntries = (long) (1.1 * targetNumManifestEntries);
-
-    return withReusableDS(manifestEntryDF, df -> {
-      Column partitionColumn = df.col("data_file.partition");
-      return df.repartitionByRange(numManifests, partitionColumn)
-          .sortWithinPartitions(partitionColumn)
-          .mapPartitions(
-              toManifests(io, maxNumManifestEntries, stagingLocation, formatVersion, spec, sparkType),
-              manifestEncoder
-          )
-          .collectAsList();
-    });
-  }
-
-  private <T, U> U withReusableDS(Dataset<T> ds, Function<Dataset<T>, U> func) {
-    Dataset<T> reusableDS;
-    boolean useCaching = PropertyUtil.propertyAsBoolean(options(), USE_CACHING, USE_CACHING_DEFAULT);
-    if (useCaching) {
-      reusableDS = ds.cache();
-    } else {
-      int parallelism = SQLConf.get().numShufflePartitions();
-      reusableDS = ds.repartition(parallelism).map((MapFunction<T, T>) value -> value, ds.exprEnc());
-    }
-
-    try {
-      return func.apply(reusableDS);
-    } finally {
-      if (useCaching) {
-        reusableDS.unpersist(false);
-      }
-    }
-  }
-
-  private List<ManifestFile> findMatchingManifests() {
-    Snapshot currentSnapshot = table.currentSnapshot();
-
-    if (currentSnapshot == null) {
-      return ImmutableList.of();
-    }
-
-    return currentSnapshot.dataManifests(fileIO).stream()
-        .filter(manifest -> manifest.partitionSpecId() == spec.specId() && predicate.test(manifest))
-        .collect(Collectors.toList());
-  }
-
-  private int targetNumManifests(long totalSizeBytes) {
-    return (int) ((totalSizeBytes + targetManifestSizeBytes - 1) / targetManifestSizeBytes);
-  }
-
-  private int targetNumManifestEntries(int numEntries, int numManifests) {
-    return (numEntries + numManifests - 1) / numManifests;
-  }
-
-  private boolean hasFileCounts(ManifestFile manifest) {
-    return manifest.addedFilesCount() != null &&
-        manifest.existingFilesCount() != null &&
-        manifest.deletedFilesCount() != null;
-  }
-
-  private void replaceManifests(Iterable<ManifestFile> deletedManifests, Iterable<ManifestFile> addedManifests) {
-    try {
-      boolean snapshotIdInheritanceEnabled = PropertyUtil.propertyAsBoolean(
-          table.properties(),
-          TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED,
-          TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED_DEFAULT);
-
-      org.apache.iceberg.RewriteManifests rewriteManifests = table.rewriteManifests();
-      deletedManifests.forEach(rewriteManifests::deleteManifest);
-      addedManifests.forEach(rewriteManifests::addManifest);
-      commit(rewriteManifests);
-
-      if (!snapshotIdInheritanceEnabled) {
-        // delete new manifests as they were rewritten before the commit
-        deleteFiles(Iterables.transform(addedManifests, ManifestFile::path));
-      }
-    } catch (CommitStateUnknownException commitStateUnknownException) {
-      // don't clean up added manifest files, because they may have been successfully committed.
-      throw commitStateUnknownException;
-    } catch (Exception e) {
-      // delete all new manifests because the rewrite failed
-      deleteFiles(Iterables.transform(addedManifests, ManifestFile::path));
-      throw e;
-    }
-  }
-
-  private void deleteFiles(Iterable<String> locations) {
-    Tasks.foreach(locations)
-        .executeWith(ThreadPools.getWorkerPool())
-        .noRetry()
-        .suppressFailureWhenFinished()
-        .onFailure((location, exc) -> LOG.warn("Failed to delete: {}", location, exc))
-        .run(fileIO::deleteFile);
-  }
-
-  private static ManifestFile writeManifest(
-      List<Row> rows, int startIndex, int endIndex, Broadcast<FileIO> io,
-      String location, int format, PartitionSpec spec, StructType sparkType) throws IOException {
-
-    String manifestName = "optimized-m-" + UUID.randomUUID();
-    Path manifestPath = new Path(location, manifestName);
-    OutputFile outputFile = io.value().newOutputFile(FileFormat.AVRO.addExtension(manifestPath.toString()));
-
-    Types.StructType dataFileType = DataFile.getType(spec.partitionType());
-    SparkDataFile wrapper = new SparkDataFile(dataFileType, sparkType);
-
-    ManifestWriter<DataFile> writer = ManifestFiles.write(format, spec, outputFile, null);
-
-    try {
-      for (int index = startIndex; index < endIndex; index++) {
-        Row row = rows.get(index);
-        long snapshotId = row.getLong(0);
-        long sequenceNumber = row.getLong(1);
-        Row file = row.getStruct(2);
-        writer.existing(wrapper.wrap(file), snapshotId, sequenceNumber);
-      }
-    } finally {
-      writer.close();
-    }
-
-    return writer.toManifestFile();
-  }
-
-  private static MapPartitionsFunction<Row, ManifestFile> toManifests(
-      Broadcast<FileIO> io, long maxNumManifestEntries, String location,
-      int format, PartitionSpec spec, StructType sparkType) {
-
-    return rows -> {
-      List<Row> rowsAsList = Lists.newArrayList(rows);
-
-      if (rowsAsList.isEmpty()) {
-        return Collections.emptyIterator();
-      }
-
-      List<ManifestFile> manifests = Lists.newArrayList();
-      if (rowsAsList.size() <= maxNumManifestEntries) {
-        manifests.add(writeManifest(rowsAsList, 0, rowsAsList.size(), io, location, format, spec, sparkType));
-      } else {
-        int midIndex = rowsAsList.size() / 2;
-        manifests.add(writeManifest(rowsAsList, 0, midIndex, io, location, format, spec, sparkType));
-        manifests.add(writeManifest(rowsAsList,  midIndex, rowsAsList.size(), io, location, format, spec, sparkType));
-      }
-
-      return manifests.iterator();
-    };
+    super(spark, table);
   }
 }
diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSnapshotTableSparkAction.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSnapshotTableSparkAction.java
index e584d33a3f..3305401e53 100644
--- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSnapshotTableSparkAction.java
+++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSnapshotTableSparkAction.java
@@ -19,186 +19,19 @@
 
 package org.apache.iceberg.spark.actions;
 
-import java.util.Map;
-import org.apache.iceberg.Snapshot;
-import org.apache.iceberg.SnapshotSummary;
-import org.apache.iceberg.Table;
-import org.apache.iceberg.TableProperties;
-import org.apache.iceberg.actions.BaseSnapshotTableActionResult;
-import org.apache.iceberg.actions.SnapshotTable;
-import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
-import org.apache.iceberg.relocated.com.google.common.collect.Maps;
-import org.apache.iceberg.spark.JobGroupInfo;
-import org.apache.iceberg.spark.Spark3Util;
-import org.apache.iceberg.spark.Spark3Util.CatalogAndIdentifier;
-import org.apache.iceberg.spark.SparkTableUtil;
-import org.apache.iceberg.spark.source.StagedSparkTable;
 import org.apache.spark.sql.SparkSession;
-import org.apache.spark.sql.catalyst.TableIdentifier;
 import org.apache.spark.sql.connector.catalog.CatalogPlugin;
 import org.apache.spark.sql.connector.catalog.Identifier;
-import org.apache.spark.sql.connector.catalog.StagingTableCatalog;
-import org.apache.spark.sql.connector.catalog.TableCatalog;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.collection.JavaConverters;
 
 /**
- * Creates a new Iceberg table based on a source Spark table. The new Iceberg table will
- * have a different data and metadata directory allowing it to exist independently of the
- * source table.
+ * An action to snapshot a table as an Iceberg table.
+ *
+ * @deprecated since 0.14.0, will be removed in 1.0.0;
+ *             use {@link SparkActions} and {@link SnapshotTableSparkAction} instead.
  */
-public class BaseSnapshotTableSparkAction
-    extends BaseTableCreationSparkAction<SnapshotTable, SnapshotTable.Result>
-    implements SnapshotTable {
-
-  private static final Logger LOG = LoggerFactory.getLogger(BaseSnapshotTableSparkAction.class);
-
-  private StagingTableCatalog destCatalog;
-  private Identifier destTableIdent;
-  private String destTableLocation = null;
-
+@Deprecated
+public class BaseSnapshotTableSparkAction extends SnapshotTableSparkAction {
   BaseSnapshotTableSparkAction(SparkSession spark, CatalogPlugin sourceCatalog, Identifier sourceTableIdent) {
     super(spark, sourceCatalog, sourceTableIdent);
   }
-
-  @Override
-  protected SnapshotTable self() {
-    return this;
-  }
-
-  @Override
-  protected StagingTableCatalog destCatalog() {
-    return destCatalog;
-  }
-
-  @Override
-  protected Identifier destTableIdent() {
-    return destTableIdent;
-  }
-
-  @Override
-  public SnapshotTable as(String ident) {
-    String ctx = "snapshot destination";
-    CatalogPlugin defaultCatalog = spark().sessionState().catalogManager().currentCatalog();
-    CatalogAndIdentifier catalogAndIdent = Spark3Util.catalogAndIdentifier(ctx, spark(), ident, defaultCatalog);
-    this.destCatalog = checkDestinationCatalog(catalogAndIdent.catalog());
-    this.destTableIdent = catalogAndIdent.identifier();
-    return this;
-  }
-
-  @Override
-  public SnapshotTable tableProperties(Map<String, String> properties) {
-    setProperties(properties);
-    return this;
-  }
-
-  @Override
-  public SnapshotTable tableProperty(String property, String value) {
-    setProperty(property, value);
-    return this;
-  }
-
-  @Override
-  public SnapshotTable.Result execute() {
-    String desc = String.format("Snapshotting table %s as %s", sourceTableIdent(), destTableIdent);
-    JobGroupInfo info = newJobGroupInfo("SNAPSHOT-TABLE", desc);
-    return withJobGroupInfo(info, this::doExecute);
-  }
-
-  private SnapshotTable.Result doExecute() {
-    Preconditions.checkArgument(destCatalog() != null && destTableIdent() != null,
-        "The destination catalog and identifier cannot be null. " +
-        "Make sure to configure the action with a valid destination table identifier via the `as` method.");
-
-    LOG.info("Staging a new Iceberg table {} as a snapshot of {}", destTableIdent(), sourceTableIdent());
-    StagedSparkTable stagedTable = stageDestTable();
-    Table icebergTable = stagedTable.table();
-
-    // TODO: Check the dest table location does not overlap with the source table location
-
-    boolean threw = true;
-    try {
-      LOG.info("Ensuring {} has a valid name mapping", destTableIdent());
-      ensureNameMappingPresent(icebergTable);
-
-      TableIdentifier v1TableIdent = v1SourceTable().identifier();
-      String stagingLocation = getMetadataLocation(icebergTable);
-      LOG.info("Generating Iceberg metadata for {} in {}", destTableIdent(), stagingLocation);
-      SparkTableUtil.importSparkTable(spark(), v1TableIdent, icebergTable, stagingLocation);
-
-      LOG.info("Committing staged changes to {}", destTableIdent());
-      stagedTable.commitStagedChanges();
-      threw = false;
-    } finally {
-      if (threw) {
-        LOG.error("Error when populating the staged table with metadata, aborting changes");
-
-        try {
-          stagedTable.abortStagedChanges();
-        } catch (Exception abortException) {
-          LOG.error("Cannot abort staged changes", abortException);
-        }
-      }
-    }
-
-    Snapshot snapshot = icebergTable.currentSnapshot();
-    long importedDataFilesCount = Long.parseLong(snapshot.summary().get(SnapshotSummary.TOTAL_DATA_FILES_PROP));
-    LOG.info("Successfully loaded Iceberg metadata for {} files to {}", importedDataFilesCount, destTableIdent());
-    return new BaseSnapshotTableActionResult(importedDataFilesCount);
-  }
-
-  @Override
-  protected Map<String, String> destTableProps() {
-    Map<String, String> properties = Maps.newHashMap();
-
-    // copy over relevant source table props
-    properties.putAll(JavaConverters.mapAsJavaMapConverter(v1SourceTable().properties()).asJava());
-    EXCLUDED_PROPERTIES.forEach(properties::remove);
-
-    // remove any possible location properties from origin properties
-    properties.remove(LOCATION);
-    properties.remove(TableProperties.WRITE_METADATA_LOCATION);
-    properties.remove(TableProperties.WRITE_FOLDER_STORAGE_LOCATION);
-    properties.remove(TableProperties.OBJECT_STORE_PATH);
-    properties.remove(TableProperties.WRITE_DATA_LOCATION);
-
-    // set default and user-provided props
-    properties.put(TableCatalog.PROP_PROVIDER, "iceberg");
-    properties.putAll(additionalProperties());
-
-    // make sure we mark this table as a snapshot table
-    properties.put(TableProperties.GC_ENABLED, "false");
-    properties.put("snapshot", "true");
-
-    // set the destination table location if provided
-    if (destTableLocation != null) {
-      properties.put(LOCATION, destTableLocation);
-    }
-
-    return properties;
-  }
-
-  @Override
-  protected TableCatalog checkSourceCatalog(CatalogPlugin catalog) {
-    // currently the import code relies on being able to look up the table in the session catalog
-    Preconditions.checkArgument(catalog.name().equalsIgnoreCase("spark_catalog"),
-        "Cannot snapshot a table that isn't in the session catalog (i.e. spark_catalog). " +
-        "Found source catalog: %s.", catalog.name());
-
-    Preconditions.checkArgument(catalog instanceof TableCatalog,
-        "Cannot snapshot as catalog %s of class %s in not a table catalog",
-        catalog.name(), catalog.getClass().getName());
-
-    return (TableCatalog) catalog;
-  }
-
-  @Override
-  public SnapshotTable tableLocation(String location) {
-    Preconditions.checkArgument(!sourceTableLocation().equals(location),
-        "The snapshot table location cannot be same as the source table location. " +
-        "This would mix snapshot table files with original table files.");
-    this.destTableLocation = location;
-    return this;
-  }
 }
diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSnapshotUpdateSparkAction.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSnapshotUpdateSparkAction.java
index 53fa06bbb5..7ed17d75dd 100644
--- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSnapshotUpdateSparkAction.java
+++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSnapshotUpdateSparkAction.java
@@ -20,12 +20,10 @@
 package org.apache.iceberg.spark.actions;
 
 import java.util.Map;
-import org.apache.iceberg.actions.SnapshotUpdate;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
 import org.apache.spark.sql.SparkSession;
 
-abstract class BaseSnapshotUpdateSparkAction<ThisT, R>
-    extends BaseSparkAction<ThisT, R> implements SnapshotUpdate<ThisT, R> {
+abstract class BaseSnapshotUpdateSparkAction<ThisT> extends BaseSparkAction<ThisT> {
 
   private final Map<String, String> summary = Maps.newHashMap();
 
@@ -33,7 +31,6 @@ abstract class BaseSnapshotUpdateSparkAction<ThisT, R>
     super(spark);
   }
 
-  @Override
   public ThisT snapshotProperty(String property, String value) {
     summary.put(property, value);
     return self();
diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java
index 04af49db6b..8b9821f40c 100644
--- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java
+++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java
@@ -34,7 +34,6 @@ import org.apache.iceberg.ReachableFileUtil;
 import org.apache.iceberg.StaticTableOperations;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.TableMetadata;
-import org.apache.iceberg.actions.Action;
 import org.apache.iceberg.io.CloseableIterator;
 import org.apache.iceberg.io.ClosingIterator;
 import org.apache.iceberg.io.FileIO;
@@ -59,7 +58,7 @@ import static org.apache.iceberg.MetadataTableType.ALL_MANIFESTS;
 import static org.apache.spark.sql.functions.col;
 import static org.apache.spark.sql.functions.lit;
 
-abstract class BaseSparkAction<ThisT, R> implements Action<ThisT, R> {
+abstract class BaseSparkAction<ThisT> {
 
   protected static final String CONTENT_FILE = "Content File";
   protected static final String MANIFEST = "Manifest";
@@ -91,13 +90,11 @@ abstract class BaseSparkAction<ThisT, R> implements Action<ThisT, R> {
 
   protected abstract ThisT self();
 
-  @Override
   public ThisT option(String name, String value) {
     options.put(name, value);
     return self();
   }
 
-  @Override
   public ThisT options(Map<String, String> newOptions) {
     options.putAll(newOptions);
     return self();
diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseTableCreationSparkAction.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseTableCreationSparkAction.java
index 6eadece65c..e3ddd7abc9 100644
--- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseTableCreationSparkAction.java
+++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseTableCreationSparkAction.java
@@ -49,7 +49,7 @@ import org.apache.spark.sql.connector.catalog.V1Table;
 import org.apache.spark.sql.connector.expressions.Transform;
 import org.apache.spark.sql.types.StructType;
 
-abstract class BaseTableCreationSparkAction<ThisT, R> extends BaseSparkAction<ThisT, R> {
+abstract class BaseTableCreationSparkAction<ThisT> extends BaseSparkAction<ThisT> {
   private static final Set<String> ALLOWED_SOURCES = ImmutableSet.of("parquet", "avro", "orc", "hive");
   protected static final String LOCATION = "location";
   protected static final String ICEBERG_METADATA_FOLDER = "metadata";
diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseDeleteOrphanFilesSparkAction.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java
similarity index 94%
copy from spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseDeleteOrphanFilesSparkAction.java
copy to spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java
index dc58a05d4d..72b4f8f43a 100644
--- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseDeleteOrphanFilesSparkAction.java
+++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java
@@ -90,10 +90,10 @@ import static org.apache.iceberg.TableProperties.GC_ENABLED_DEFAULT;
  * <em>Note:</em> It is dangerous to call this action with a short retention interval as it might corrupt
  * the state of the table if another operation is writing at the same time.
  */
-public class BaseDeleteOrphanFilesSparkAction
-    extends BaseSparkAction<DeleteOrphanFiles, DeleteOrphanFiles.Result> implements DeleteOrphanFiles {
+public class DeleteOrphanFilesSparkAction
+    extends BaseSparkAction<DeleteOrphanFilesSparkAction> implements DeleteOrphanFiles {
 
-  private static final Logger LOG = LoggerFactory.getLogger(BaseDeleteOrphanFilesSparkAction.class);
+  private static final Logger LOG = LoggerFactory.getLogger(DeleteOrphanFilesSparkAction.class);
   private static final UserDefinedFunction filenameUDF = functions.udf((String path) -> {
     int lastIndex = path.lastIndexOf(File.separator);
     if (lastIndex == -1) {
@@ -119,7 +119,7 @@ public class BaseDeleteOrphanFilesSparkAction
   private Consumer<String> deleteFunc = defaultDelete;
   private ExecutorService deleteExecutorService = null;
 
-  public BaseDeleteOrphanFilesSparkAction(SparkSession spark, Table table) {
+  DeleteOrphanFilesSparkAction(SparkSession spark, Table table) {
     super(spark);
 
     this.hadoopConf = new SerializableConfiguration(spark.sessionState().newHadoopConf());
@@ -133,35 +133,35 @@ public class BaseDeleteOrphanFilesSparkAction
   }
 
   @Override
-  protected DeleteOrphanFiles self() {
+  protected DeleteOrphanFilesSparkAction self() {
     return this;
   }
 
   @Override
-  public BaseDeleteOrphanFilesSparkAction executeDeleteWith(ExecutorService executorService) {
+  public DeleteOrphanFilesSparkAction executeDeleteWith(ExecutorService executorService) {
     this.deleteExecutorService = executorService;
     return this;
   }
 
   @Override
-  public BaseDeleteOrphanFilesSparkAction location(String newLocation) {
+  public DeleteOrphanFilesSparkAction location(String newLocation) {
     this.location = newLocation;
     return this;
   }
 
   @Override
-  public BaseDeleteOrphanFilesSparkAction olderThan(long newOlderThanTimestamp) {
+  public DeleteOrphanFilesSparkAction olderThan(long newOlderThanTimestamp) {
     this.olderThanTimestamp = newOlderThanTimestamp;
     return this;
   }
 
   @Override
-  public BaseDeleteOrphanFilesSparkAction deleteWith(Consumer<String> newDeleteFunc) {
+  public DeleteOrphanFilesSparkAction deleteWith(Consumer<String> newDeleteFunc) {
     this.deleteFunc = newDeleteFunc;
     return this;
   }
 
-  public BaseDeleteOrphanFilesSparkAction compareToFileList(Dataset<Row> files) {
+  public DeleteOrphanFilesSparkAction compareToFileList(Dataset<Row> files) {
     StructType schema = files.schema();
 
     StructField filePathField = schema.apply(FILE_PATH);
diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseDeleteReachableFilesSparkAction.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteReachableFilesSparkAction.java
similarity index 91%
copy from spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseDeleteReachableFilesSparkAction.java
copy to spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteReachableFilesSparkAction.java
index fc2a5fa5cf..e840d90c2c 100644
--- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseDeleteReachableFilesSparkAction.java
+++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteReachableFilesSparkAction.java
@@ -50,13 +50,13 @@ import static org.apache.iceberg.TableProperties.GC_ENABLED_DEFAULT;
  * to determine which files should be deleted.
  */
 @SuppressWarnings("UnnecessaryAnonymousClass")
-public class BaseDeleteReachableFilesSparkAction
-    extends BaseSparkAction<DeleteReachableFiles, DeleteReachableFiles.Result> implements DeleteReachableFiles {
+public class DeleteReachableFilesSparkAction
+    extends BaseSparkAction<DeleteReachableFilesSparkAction> implements DeleteReachableFiles {
 
   public static final String STREAM_RESULTS = "stream-results";
   public static final boolean STREAM_RESULTS_DEFAULT = false;
 
-  private static final Logger LOG = LoggerFactory.getLogger(BaseDeleteReachableFilesSparkAction.class);
+  private static final Logger LOG = LoggerFactory.getLogger(DeleteReachableFilesSparkAction.class);
 
   private final String metadataFileLocation;
   private final Consumer<String> defaultDelete = new Consumer<String>() {
@@ -70,30 +70,30 @@ public class BaseDeleteReachableFilesSparkAction
   private ExecutorService deleteExecutorService = null;
   private FileIO io = new HadoopFileIO(spark().sessionState().newHadoopConf());
 
-  public BaseDeleteReachableFilesSparkAction(SparkSession spark, String metadataFileLocation) {
+  DeleteReachableFilesSparkAction(SparkSession spark, String metadataFileLocation) {
     super(spark);
     this.metadataFileLocation = metadataFileLocation;
   }
 
   @Override
-  protected DeleteReachableFiles self() {
+  protected DeleteReachableFilesSparkAction self() {
     return this;
   }
 
   @Override
-  public DeleteReachableFiles io(FileIO fileIO) {
+  public DeleteReachableFilesSparkAction io(FileIO fileIO) {
     this.io = fileIO;
     return this;
   }
 
   @Override
-  public DeleteReachableFiles deleteWith(Consumer<String> newDeleteFunc) {
+  public DeleteReachableFilesSparkAction deleteWith(Consumer<String> newDeleteFunc) {
     this.deleteFunc = newDeleteFunc;
     return this;
   }
 
   @Override
-  public DeleteReachableFiles executeDeleteWith(ExecutorService executorService) {
+  public DeleteReachableFilesSparkAction executeDeleteWith(ExecutorService executorService) {
     this.deleteExecutorService = executorService;
     return this;
   }
diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseExpireSnapshotsSparkAction.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/ExpireSnapshotsSparkAction.java
similarity index 93%
copy from spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseExpireSnapshotsSparkAction.java
copy to spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/ExpireSnapshotsSparkAction.java
index 2216383337..f3fc7bdc14 100644
--- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseExpireSnapshotsSparkAction.java
+++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/ExpireSnapshotsSparkAction.java
@@ -65,13 +65,13 @@ import static org.apache.iceberg.TableProperties.GC_ENABLED_DEFAULT;
  * Deletes are still performed locally after retrieving the results from the Spark executors.
  */
 @SuppressWarnings("UnnecessaryAnonymousClass")
-public class BaseExpireSnapshotsSparkAction
-    extends BaseSparkAction<ExpireSnapshots, ExpireSnapshots.Result> implements ExpireSnapshots {
+public class ExpireSnapshotsSparkAction
+    extends BaseSparkAction<ExpireSnapshotsSparkAction> implements ExpireSnapshots {
 
   public static final String STREAM_RESULTS = "stream-results";
   public static final boolean STREAM_RESULTS_DEFAULT = false;
 
-  private static final Logger LOG = LoggerFactory.getLogger(BaseExpireSnapshotsSparkAction.class);
+  private static final Logger LOG = LoggerFactory.getLogger(ExpireSnapshotsSparkAction.class);
 
   private final Table table;
   private final TableOperations ops;
@@ -89,7 +89,7 @@ public class BaseExpireSnapshotsSparkAction
   private ExecutorService deleteExecutorService = null;
   private Dataset<Row> expiredFiles = null;
 
-  public BaseExpireSnapshotsSparkAction(SparkSession spark, Table table) {
+  ExpireSnapshotsSparkAction(SparkSession spark, Table table) {
     super(spark);
     this.table = table;
     this.ops = ((HasTableOperations) table).operations();
@@ -100,30 +100,30 @@ public class BaseExpireSnapshotsSparkAction
   }
 
   @Override
-  protected ExpireSnapshots self() {
+  protected ExpireSnapshotsSparkAction self() {
     return this;
   }
 
   @Override
-  public BaseExpireSnapshotsSparkAction executeDeleteWith(ExecutorService executorService) {
+  public ExpireSnapshotsSparkAction executeDeleteWith(ExecutorService executorService) {
     this.deleteExecutorService = executorService;
     return this;
   }
 
   @Override
-  public BaseExpireSnapshotsSparkAction expireSnapshotId(long snapshotId) {
+  public ExpireSnapshotsSparkAction expireSnapshotId(long snapshotId) {
     expiredSnapshotIds.add(snapshotId);
     return this;
   }
 
   @Override
-  public BaseExpireSnapshotsSparkAction expireOlderThan(long timestampMillis) {
+  public ExpireSnapshotsSparkAction expireOlderThan(long timestampMillis) {
     this.expireOlderThanValue = timestampMillis;
     return this;
   }
 
   @Override
-  public BaseExpireSnapshotsSparkAction retainLast(int numSnapshots) {
+  public ExpireSnapshotsSparkAction retainLast(int numSnapshots) {
     Preconditions.checkArgument(1 <= numSnapshots,
         "Number of snapshots to retain must be at least 1, cannot be: %s", numSnapshots);
     this.retainLastValue = numSnapshots;
@@ -131,7 +131,7 @@ public class BaseExpireSnapshotsSparkAction
   }
 
   @Override
-  public BaseExpireSnapshotsSparkAction deleteWith(Consumer<String> newDeleteFunc) {
+  public ExpireSnapshotsSparkAction deleteWith(Consumer<String> newDeleteFunc) {
     this.deleteFunc = newDeleteFunc;
     return this;
   }
diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseMigrateTableSparkAction.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/MigrateTableSparkAction.java
similarity index 93%
copy from spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseMigrateTableSparkAction.java
copy to spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/MigrateTableSparkAction.java
index ef9f0d3e25..7146bffcbe 100644
--- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseMigrateTableSparkAction.java
+++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/MigrateTableSparkAction.java
@@ -50,18 +50,18 @@ import scala.collection.JavaConverters;
  * previously referred to a non-Iceberg table will refer to the newly migrated Iceberg
  * table.
  */
-public class BaseMigrateTableSparkAction
-    extends BaseTableCreationSparkAction<MigrateTable, MigrateTable.Result>
+public class MigrateTableSparkAction
+    extends BaseTableCreationSparkAction<MigrateTableSparkAction>
     implements MigrateTable {
 
-  private static final Logger LOG = LoggerFactory.getLogger(BaseMigrateTableSparkAction.class);
+  private static final Logger LOG = LoggerFactory.getLogger(MigrateTableSparkAction.class);
   private static final String BACKUP_SUFFIX = "_BACKUP_";
 
   private final StagingTableCatalog destCatalog;
   private final Identifier destTableIdent;
   private final Identifier backupIdent;
 
-  public BaseMigrateTableSparkAction(SparkSession spark, CatalogPlugin sourceCatalog, Identifier sourceTableIdent) {
+  MigrateTableSparkAction(SparkSession spark, CatalogPlugin sourceCatalog, Identifier sourceTableIdent) {
     super(spark, sourceCatalog, sourceTableIdent);
     this.destCatalog = checkDestinationCatalog(sourceCatalog);
     this.destTableIdent = sourceTableIdent;
@@ -70,7 +70,7 @@ public class BaseMigrateTableSparkAction
   }
 
   @Override
-  protected MigrateTable self() {
+  protected MigrateTableSparkAction self() {
     return this;
   }
 
@@ -85,13 +85,13 @@ public class BaseMigrateTableSparkAction
   }
 
   @Override
-  public MigrateTable tableProperties(Map<String, String> properties) {
+  public MigrateTableSparkAction tableProperties(Map<String, String> properties) {
     setProperties(properties);
     return this;
   }
 
   @Override
-  public MigrateTable tableProperty(String property, String value) {
+  public MigrateTableSparkAction tableProperty(String property, String value) {
     setProperty(property, value);
     return this;
   }
diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteDataFilesSparkAction.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java
similarity index 96%
copy from spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteDataFilesSparkAction.java
copy to spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java
index d1937fcbcd..bb2cbd8329 100644
--- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteDataFilesSparkAction.java
+++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java
@@ -72,10 +72,11 @@ import org.apache.spark.sql.SparkSession;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class BaseRewriteDataFilesSparkAction
-    extends BaseSnapshotUpdateSparkAction<RewriteDataFiles, RewriteDataFiles.Result> implements RewriteDataFiles {
+public class RewriteDataFilesSparkAction
+    extends BaseSnapshotUpdateSparkAction<RewriteDataFilesSparkAction>
+    implements RewriteDataFiles {
 
-  private static final Logger LOG = LoggerFactory.getLogger(BaseRewriteDataFilesSparkAction.class);
+  private static final Logger LOG = LoggerFactory.getLogger(RewriteDataFilesSparkAction.class);
   private static final Set<String> VALID_OPTIONS = ImmutableSet.of(
       MAX_CONCURRENT_FILE_GROUP_REWRITES,
       MAX_FILE_GROUP_SIZE_BYTES,
@@ -96,18 +97,18 @@ public class BaseRewriteDataFilesSparkAction
   private RewriteJobOrder rewriteJobOrder;
   private RewriteStrategy strategy = null;
 
-  protected BaseRewriteDataFilesSparkAction(SparkSession spark, Table table) {
+  RewriteDataFilesSparkAction(SparkSession spark, Table table) {
     super(spark);
     this.table = table;
   }
 
   @Override
-  protected RewriteDataFiles self() {
+  protected RewriteDataFilesSparkAction self() {
     return this;
   }
 
   @Override
-  public RewriteDataFiles binPack() {
+  public RewriteDataFilesSparkAction binPack() {
     Preconditions.checkArgument(this.strategy == null,
         "Cannot set strategy to binpack, it has already been set", this.strategy);
     this.strategy = binPackStrategy();
@@ -115,7 +116,7 @@ public class BaseRewriteDataFilesSparkAction
   }
 
   @Override
-  public RewriteDataFiles sort(SortOrder sortOrder) {
+  public RewriteDataFilesSparkAction sort(SortOrder sortOrder) {
     Preconditions.checkArgument(this.strategy == null,
         "Cannot set strategy to sort, it has already been set to %s", this.strategy);
     this.strategy = sortStrategy().sortOrder(sortOrder);
@@ -123,7 +124,7 @@ public class BaseRewriteDataFilesSparkAction
   }
 
   @Override
-  public RewriteDataFiles sort() {
+  public RewriteDataFilesSparkAction sort() {
     Preconditions.checkArgument(this.strategy == null,
         "Cannot set strategy to sort, it has already been set to %s", this.strategy);
     this.strategy = sortStrategy();
@@ -131,7 +132,7 @@ public class BaseRewriteDataFilesSparkAction
   }
 
   @Override
-  public RewriteDataFiles zOrder(String... columnNames) {
+  public RewriteDataFilesSparkAction zOrder(String... columnNames) {
     Preconditions.checkArgument(this.strategy == null,
         "Cannot set strategy to zorder, it has already been set to %s", this.strategy);
     this.strategy = zOrderStrategy(columnNames);
@@ -139,7 +140,7 @@ public class BaseRewriteDataFilesSparkAction
   }
 
   @Override
-  public RewriteDataFiles filter(Expression expression) {
+  public RewriteDataFilesSparkAction filter(Expression expression) {
     filter = Expressions.and(filter, expression);
     return this;
   }
diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteManifestsSparkAction.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java
similarity index 95%
copy from spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteManifestsSparkAction.java
copy to spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java
index 1648dd26f8..99e51a37aa 100644
--- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteManifestsSparkAction.java
+++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java
@@ -79,14 +79,14 @@ import static org.apache.iceberg.MetadataTableType.ENTRIES;
  * and a custom spec id to {@link #specId(int)}. In addition, there is a way to configure a custom location
  * for new manifests via {@link #stagingLocation}.
  */
-public class BaseRewriteManifestsSparkAction
-    extends BaseSnapshotUpdateSparkAction<RewriteManifests, RewriteManifests.Result>
+public class RewriteManifestsSparkAction
+    extends BaseSnapshotUpdateSparkAction<RewriteManifestsSparkAction>
     implements RewriteManifests {
 
-  private static final Logger LOG = LoggerFactory.getLogger(BaseRewriteManifestsSparkAction.class);
+  public static final String USE_CACHING = "use-caching";
+  public static final boolean USE_CACHING_DEFAULT = true;
 
-  private static final String USE_CACHING = "use-caching";
-  private static final boolean USE_CACHING_DEFAULT = true;
+  private static final Logger LOG = LoggerFactory.getLogger(RewriteManifestsSparkAction.class);
 
   private final Encoder<ManifestFile> manifestEncoder;
   private final Table table;
@@ -98,7 +98,7 @@ public class BaseRewriteManifestsSparkAction
   private Predicate<ManifestFile> predicate = manifest -> true;
   private String stagingLocation = null;
 
-  public BaseRewriteManifestsSparkAction(SparkSession spark, Table table) {
+  RewriteManifestsSparkAction(SparkSession spark, Table table) {
     super(spark);
     this.manifestEncoder = Encoders.javaSerialization(ManifestFile.class);
     this.table = table;
@@ -119,25 +119,25 @@ public class BaseRewriteManifestsSparkAction
   }
 
   @Override
-  protected RewriteManifests self() {
+  protected RewriteManifestsSparkAction self() {
     return this;
   }
 
   @Override
-  public RewriteManifests specId(int specId) {
+  public RewriteManifestsSparkAction specId(int specId) {
     Preconditions.checkArgument(table.specs().containsKey(specId), "Invalid spec id %s", specId);
     this.spec = table.specs().get(specId);
     return this;
   }
 
   @Override
-  public RewriteManifests rewriteIf(Predicate<ManifestFile> newPredicate) {
+  public RewriteManifestsSparkAction rewriteIf(Predicate<ManifestFile> newPredicate) {
     this.predicate = newPredicate;
     return this;
   }
 
   @Override
-  public RewriteManifests stagingLocation(String newStagingLocation) {
+  public RewriteManifestsSparkAction stagingLocation(String newStagingLocation) {
     this.stagingLocation = newStagingLocation;
     return this;
   }
diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSnapshotTableSparkAction.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/SnapshotTableSparkAction.java
similarity index 92%
copy from spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSnapshotTableSparkAction.java
copy to spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/SnapshotTableSparkAction.java
index e584d33a3f..526b46af1a 100644
--- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSnapshotTableSparkAction.java
+++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/SnapshotTableSparkAction.java
@@ -48,22 +48,22 @@ import scala.collection.JavaConverters;
  * have a different data and metadata directory allowing it to exist independently of the
  * source table.
  */
-public class BaseSnapshotTableSparkAction
-    extends BaseTableCreationSparkAction<SnapshotTable, SnapshotTable.Result>
+public class SnapshotTableSparkAction
+    extends BaseTableCreationSparkAction<SnapshotTableSparkAction>
     implements SnapshotTable {
 
-  private static final Logger LOG = LoggerFactory.getLogger(BaseSnapshotTableSparkAction.class);
+  private static final Logger LOG = LoggerFactory.getLogger(SnapshotTableSparkAction.class);
 
   private StagingTableCatalog destCatalog;
   private Identifier destTableIdent;
   private String destTableLocation = null;
 
-  BaseSnapshotTableSparkAction(SparkSession spark, CatalogPlugin sourceCatalog, Identifier sourceTableIdent) {
+  SnapshotTableSparkAction(SparkSession spark, CatalogPlugin sourceCatalog, Identifier sourceTableIdent) {
     super(spark, sourceCatalog, sourceTableIdent);
   }
 
   @Override
-  protected SnapshotTable self() {
+  protected SnapshotTableSparkAction self() {
     return this;
   }
 
@@ -78,7 +78,7 @@ public class BaseSnapshotTableSparkAction
   }
 
   @Override
-  public SnapshotTable as(String ident) {
+  public SnapshotTableSparkAction as(String ident) {
     String ctx = "snapshot destination";
     CatalogPlugin defaultCatalog = spark().sessionState().catalogManager().currentCatalog();
     CatalogAndIdentifier catalogAndIdent = Spark3Util.catalogAndIdentifier(ctx, spark(), ident, defaultCatalog);
@@ -88,13 +88,13 @@ public class BaseSnapshotTableSparkAction
   }
 
   @Override
-  public SnapshotTable tableProperties(Map<String, String> properties) {
+  public SnapshotTableSparkAction tableProperties(Map<String, String> properties) {
     setProperties(properties);
     return this;
   }
 
   @Override
-  public SnapshotTable tableProperty(String property, String value) {
+  public SnapshotTableSparkAction tableProperty(String property, String value) {
     setProperty(property, value);
     return this;
   }
@@ -194,7 +194,7 @@ public class BaseSnapshotTableSparkAction
   }
 
   @Override
-  public SnapshotTable tableLocation(String location) {
+  public SnapshotTableSparkAction tableLocation(String location) {
     Preconditions.checkArgument(!sourceTableLocation().equals(location),
         "The snapshot table location cannot be same as the source table location. " +
         "This would mix snapshot table files with original table files.");
diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/SparkActions.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/SparkActions.java
index d77a13d128..7131f869f3 100644
--- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/SparkActions.java
+++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/SparkActions.java
@@ -21,13 +21,6 @@ package org.apache.iceberg.spark.actions;
 
 import org.apache.iceberg.Table;
 import org.apache.iceberg.actions.ActionsProvider;
-import org.apache.iceberg.actions.DeleteOrphanFiles;
-import org.apache.iceberg.actions.DeleteReachableFiles;
-import org.apache.iceberg.actions.ExpireSnapshots;
-import org.apache.iceberg.actions.MigrateTable;
-import org.apache.iceberg.actions.RewriteDataFiles;
-import org.apache.iceberg.actions.RewriteManifests;
-import org.apache.iceberg.actions.SnapshotTable;
 import org.apache.iceberg.spark.Spark3Util;
 import org.apache.iceberg.spark.Spark3Util.CatalogAndIdentifier;
 import org.apache.spark.sql.SparkSession;
@@ -56,7 +49,7 @@ public class SparkActions implements ActionsProvider {
   }
 
   @Override
-  public SnapshotTable snapshotTable(String tableIdent) {
+  public SnapshotTableSparkAction snapshotTable(String tableIdent) {
     String ctx = "snapshot source";
     CatalogPlugin defaultCatalog = spark.sessionState().catalogManager().currentCatalog();
     CatalogAndIdentifier catalogAndIdent = Spark3Util.catalogAndIdentifier(ctx, spark, tableIdent, defaultCatalog);
@@ -64,7 +57,7 @@ public class SparkActions implements ActionsProvider {
   }
 
   @Override
-  public MigrateTable migrateTable(String tableIdent) {
+  public MigrateTableSparkAction migrateTable(String tableIdent) {
     String ctx = "migrate target";
     CatalogPlugin defaultCatalog = spark.sessionState().catalogManager().currentCatalog();
     CatalogAndIdentifier catalogAndIdent = Spark3Util.catalogAndIdentifier(ctx, spark, tableIdent, defaultCatalog);
@@ -72,27 +65,27 @@ public class SparkActions implements ActionsProvider {
   }
 
   @Override
-  public RewriteDataFiles rewriteDataFiles(Table table) {
+  public RewriteDataFilesSparkAction rewriteDataFiles(Table table) {
     return new BaseRewriteDataFilesSparkAction(spark, table);
   }
 
   @Override
-  public DeleteOrphanFiles deleteOrphanFiles(Table table) {
+  public DeleteOrphanFilesSparkAction deleteOrphanFiles(Table table) {
     return new BaseDeleteOrphanFilesSparkAction(spark, table);
   }
 
   @Override
-  public RewriteManifests rewriteManifests(Table table) {
+  public RewriteManifestsSparkAction rewriteManifests(Table table) {
     return new BaseRewriteManifestsSparkAction(spark, table);
   }
 
   @Override
-  public ExpireSnapshots expireSnapshots(Table table) {
+  public ExpireSnapshotsSparkAction expireSnapshots(Table table) {
     return new BaseExpireSnapshotsSparkAction(spark, table);
   }
 
   @Override
-  public DeleteReachableFiles deleteReachableFiles(String metadataLocation) {
+  public DeleteReachableFilesSparkAction deleteReachableFiles(String metadataLocation) {
     return new BaseDeleteReachableFilesSparkAction(spark, metadataLocation);
   }
 }
diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/ExpireSnapshotsProcedure.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/ExpireSnapshotsProcedure.java
index 042fa6e4bb..272cacc4d4 100644
--- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/ExpireSnapshotsProcedure.java
+++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/ExpireSnapshotsProcedure.java
@@ -22,7 +22,7 @@ package org.apache.iceberg.spark.procedures;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.actions.ExpireSnapshots;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
-import org.apache.iceberg.spark.actions.BaseExpireSnapshotsSparkAction;
+import org.apache.iceberg.spark.actions.ExpireSnapshotsSparkAction;
 import org.apache.iceberg.spark.actions.SparkActions;
 import org.apache.iceberg.spark.procedures.SparkProcedures.ProcedureBuilder;
 import org.apache.iceberg.util.DateTimeUtil;
@@ -108,7 +108,7 @@ public class ExpireSnapshotsProcedure extends BaseProcedure {
       }
 
       if (streamResult != null) {
-        action.option(BaseExpireSnapshotsSparkAction.STREAM_RESULTS, Boolean.toString(streamResult));
+        action.option(ExpireSnapshotsSparkAction.STREAM_RESULTS, Boolean.toString(streamResult));
       }
 
       ExpireSnapshots.Result result = action.execute();
diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/RemoveOrphanFilesProcedure.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/RemoveOrphanFilesProcedure.java
index 23d6471908..a64a28b631 100644
--- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/RemoveOrphanFilesProcedure.java
+++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/RemoveOrphanFilesProcedure.java
@@ -24,7 +24,7 @@ import org.apache.iceberg.Table;
 import org.apache.iceberg.actions.DeleteOrphanFiles;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
-import org.apache.iceberg.spark.actions.BaseDeleteOrphanFilesSparkAction;
+import org.apache.iceberg.spark.actions.DeleteOrphanFilesSparkAction;
 import org.apache.iceberg.spark.actions.SparkActions;
 import org.apache.iceberg.spark.procedures.SparkProcedures.ProcedureBuilder;
 import org.apache.iceberg.util.DateTimeUtil;
@@ -95,7 +95,7 @@ public class RemoveOrphanFilesProcedure extends BaseProcedure {
             "max_concurrent_deletes should have value > 0,  value: " + maxConcurrentDeletes);
 
     return withIcebergTable(tableIdent, table -> {
-      DeleteOrphanFiles action = actions().deleteOrphanFiles(table);
+      DeleteOrphanFilesSparkAction action = actions().deleteOrphanFiles(table);
 
       if (olderThanMillis != null) {
         boolean isTesting = Boolean.parseBoolean(spark().conf().get("spark.testing", "false"));
@@ -118,7 +118,7 @@ public class RemoveOrphanFilesProcedure extends BaseProcedure {
       }
 
       if (fileListView != null) {
-        ((BaseDeleteOrphanFilesSparkAction) action).compareToFileList(spark().table(fileListView));
+        action.compareToFileList(spark().table(fileListView));
       }
 
       DeleteOrphanFiles.Result result = action.execute();
diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/RewriteManifestsProcedure.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/RewriteManifestsProcedure.java
index eae7ce0fca..1cc76501c0 100644
--- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/RewriteManifestsProcedure.java
+++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/RewriteManifestsProcedure.java
@@ -22,6 +22,7 @@ package org.apache.iceberg.spark.procedures;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.actions.RewriteManifests;
 import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.spark.actions.RewriteManifestsSparkAction;
 import org.apache.iceberg.spark.actions.SparkActions;
 import org.apache.iceberg.spark.procedures.SparkProcedures.ProcedureBuilder;
 import org.apache.spark.sql.catalyst.InternalRow;
@@ -82,10 +83,10 @@ class RewriteManifestsProcedure extends BaseProcedure {
     Boolean useCaching = args.isNullAt(1) ? null : args.getBoolean(1);
 
     return modifyIcebergTable(tableIdent, table -> {
-      RewriteManifests action = actions().rewriteManifests(table);
+      RewriteManifestsSparkAction action = actions().rewriteManifests(table);
 
       if (useCaching != null) {
-        action.option("use-caching", useCaching.toString());
+        action.option(RewriteManifestsSparkAction.USE_CACHING, useCaching.toString());
       }
 
       RewriteManifests.Result result = action.execute();
diff --git a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/actions/TestExpireSnapshotsAction.java b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/actions/TestExpireSnapshotsAction.java
index 2c324a88f1..232a85f984 100644
--- a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/actions/TestExpireSnapshotsAction.java
+++ b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/actions/TestExpireSnapshotsAction.java
@@ -1096,7 +1096,7 @@ public class TestExpireSnapshotsAction extends SparkTestBase {
 
     Set<String> deletedFiles = Sets.newHashSet();
 
-    BaseExpireSnapshotsSparkAction action = (BaseExpireSnapshotsSparkAction) SparkActions.get().expireSnapshots(table)
+    ExpireSnapshotsSparkAction action = SparkActions.get().expireSnapshots(table)
         .expireOlderThan(tAfterCommits)
         .deleteWith(deletedFiles::add);
     Dataset<Row> pendingDeletes = action.expire();
diff --git a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction.java b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction.java
index a56e888207..1d9e479397 100644
--- a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction.java
+++ b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction.java
@@ -892,7 +892,7 @@ public abstract class TestRemoveOrphanFilesAction extends SparkTestBase {
             .withColumnRenamed("lastModified", "last_modified");
 
     DeleteOrphanFiles.Result result1 =
-        ((BaseDeleteOrphanFilesSparkAction) actions.deleteOrphanFiles(table))
+        actions.deleteOrphanFiles(table)
             .compareToFileList(compareToFileList)
             .deleteWith(s -> { })
             .execute();
@@ -901,7 +901,7 @@ public abstract class TestRemoveOrphanFilesAction extends SparkTestBase {
         Iterables.isEmpty(result1.orphanFileLocations()));
 
     DeleteOrphanFiles.Result result2 =
-        ((BaseDeleteOrphanFilesSparkAction) actions.deleteOrphanFiles(table))
+        actions.deleteOrphanFiles(table)
             .compareToFileList(compareToFileList)
             .olderThan(System.currentTimeMillis())
             .deleteWith(s -> { })
@@ -912,7 +912,7 @@ public abstract class TestRemoveOrphanFilesAction extends SparkTestBase {
         "Invalid file should be present", fs.exists(new Path(invalidFilePaths.get(0))));
 
     DeleteOrphanFiles.Result result3 =
-        ((BaseDeleteOrphanFilesSparkAction) actions.deleteOrphanFiles(table))
+        actions.deleteOrphanFiles(table)
             .compareToFileList(compareToFileList)
             .olderThan(System.currentTimeMillis())
             .execute();
@@ -940,7 +940,7 @@ public abstract class TestRemoveOrphanFilesAction extends SparkTestBase {
             .withColumnRenamed("lastModified", "last_modified");
 
     DeleteOrphanFiles.Result result4 =
-        ((BaseDeleteOrphanFilesSparkAction) actions.deleteOrphanFiles(table))
+        actions.deleteOrphanFiles(table)
             .compareToFileList(compareToFileListWithOutsideLocation)
             .deleteWith(s -> { })
             .execute();
diff --git a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
index e0b1e53cfe..db08619597 100644
--- a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
+++ b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
@@ -79,7 +79,7 @@ import org.apache.iceberg.spark.FileRewriteCoordinator;
 import org.apache.iceberg.spark.FileScanTaskSetManager;
 import org.apache.iceberg.spark.SparkTableUtil;
 import org.apache.iceberg.spark.SparkTestBase;
-import org.apache.iceberg.spark.actions.BaseRewriteDataFilesSparkAction.RewriteExecutionContext;
+import org.apache.iceberg.spark.actions.RewriteDataFilesSparkAction.RewriteExecutionContext;
 import org.apache.iceberg.spark.source.ThreeColumnRecord;
 import org.apache.iceberg.types.Comparators;
 import org.apache.iceberg.types.Conversions;