You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by ao...@apache.org on 2021/04/13 04:03:06 UTC

[iceberg] branch master updated: Spark: Refactor snapshot and migrate actions (#2437)

This is an automated email from the ASF dual-hosted git repository.

aokolnychyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 6eaabb5  Spark: Refactor snapshot and migrate actions (#2437)
6eaabb5 is described below

commit 6eaabb59653f51bdfbb4063ebe14632b1f044d96
Author: Anton Okolnychyi <ao...@apple.com>
AuthorDate: Mon Apr 12 21:02:48 2021 -0700

    Spark: Refactor snapshot and migrate actions (#2437)
---
 .../actions/BaseMigrateTableActionResult.java      |  14 +-
 .../actions/BaseSnapshotTableActionResult.java     |  14 +-
 .../org/apache/iceberg/actions/CreateAction.java   |   1 +
 .../org/apache/iceberg/actions/SnapshotAction.java |   1 +
 .../iceberg/actions/Spark3MigrateAction.java       | 146 +++------------------
 .../iceberg/actions/Spark3SnapshotAction.java      | 119 +++--------------
 .../actions/BaseMigrateTableSparkAction.java}      |  91 +++++++++----
 .../actions/BaseSnapshotTableSparkAction.java}     | 124 ++++++++++++-----
 .../actions/BaseTableCreationSparkAction.java}     |  74 +++--------
 9 files changed, 234 insertions(+), 350 deletions(-)

diff --git a/spark/src/main/java/org/apache/iceberg/actions/SnapshotAction.java b/core/src/main/java/org/apache/iceberg/actions/BaseMigrateTableActionResult.java
similarity index 71%
copy from spark/src/main/java/org/apache/iceberg/actions/SnapshotAction.java
copy to core/src/main/java/org/apache/iceberg/actions/BaseMigrateTableActionResult.java
index cb1a559..4d7397b 100644
--- a/spark/src/main/java/org/apache/iceberg/actions/SnapshotAction.java
+++ b/core/src/main/java/org/apache/iceberg/actions/BaseMigrateTableActionResult.java
@@ -19,6 +19,16 @@
 
 package org.apache.iceberg.actions;
 
-public interface SnapshotAction extends CreateAction {
-  SnapshotAction withLocation(String location);
+public class BaseMigrateTableActionResult implements MigrateTable.Result {
+
+  private final long migratedDataFilesCount;
+
+  public BaseMigrateTableActionResult(long migratedDataFilesCount) {
+    this.migratedDataFilesCount = migratedDataFilesCount;
+  }
+
+  @Override
+  public long migratedDataFilesCount() {
+    return migratedDataFilesCount;
+  }
 }
diff --git a/spark/src/main/java/org/apache/iceberg/actions/SnapshotAction.java b/core/src/main/java/org/apache/iceberg/actions/BaseSnapshotTableActionResult.java
similarity index 70%
copy from spark/src/main/java/org/apache/iceberg/actions/SnapshotAction.java
copy to core/src/main/java/org/apache/iceberg/actions/BaseSnapshotTableActionResult.java
index cb1a559..3ea24d3 100644
--- a/spark/src/main/java/org/apache/iceberg/actions/SnapshotAction.java
+++ b/core/src/main/java/org/apache/iceberg/actions/BaseSnapshotTableActionResult.java
@@ -19,6 +19,16 @@
 
 package org.apache.iceberg.actions;
 
-public interface SnapshotAction extends CreateAction {
-  SnapshotAction withLocation(String location);
+public class BaseSnapshotTableActionResult implements SnapshotTable.Result {
+
+  private final long importedDataFilesCount;
+
+  public BaseSnapshotTableActionResult(long importedDataFilesCount) {
+    this.importedDataFilesCount = importedDataFilesCount;
+  }
+
+  @Override
+  public long importedDataFilesCount() {
+    return importedDataFilesCount;
+  }
 }
diff --git a/spark/src/main/java/org/apache/iceberg/actions/CreateAction.java b/spark/src/main/java/org/apache/iceberg/actions/CreateAction.java
index 6760dd1..a567bda 100644
--- a/spark/src/main/java/org/apache/iceberg/actions/CreateAction.java
+++ b/spark/src/main/java/org/apache/iceberg/actions/CreateAction.java
@@ -21,6 +21,7 @@ package org.apache.iceberg.actions;
 
 import java.util.Map;
 
+@Deprecated
 public interface CreateAction extends Action<CreateAction, Long> {
 
   /**
diff --git a/spark/src/main/java/org/apache/iceberg/actions/SnapshotAction.java b/spark/src/main/java/org/apache/iceberg/actions/SnapshotAction.java
index cb1a559..6c3e8e6 100644
--- a/spark/src/main/java/org/apache/iceberg/actions/SnapshotAction.java
+++ b/spark/src/main/java/org/apache/iceberg/actions/SnapshotAction.java
@@ -19,6 +19,7 @@
 
 package org.apache.iceberg.actions;
 
+@Deprecated
 public interface SnapshotAction extends CreateAction {
   SnapshotAction withLocation(String location);
 }
diff --git a/spark3/src/main/java/org/apache/iceberg/actions/Spark3MigrateAction.java b/spark3/src/main/java/org/apache/iceberg/actions/Spark3MigrateAction.java
index e2df5ee..b5c777d 100644
--- a/spark3/src/main/java/org/apache/iceberg/actions/Spark3MigrateAction.java
+++ b/spark3/src/main/java/org/apache/iceberg/actions/Spark3MigrateAction.java
@@ -20,157 +20,43 @@
 package org.apache.iceberg.actions;
 
 import java.util.Map;
-import org.apache.iceberg.Snapshot;
-import org.apache.iceberg.SnapshotSummary;
-import org.apache.iceberg.Table;
-import org.apache.iceberg.exceptions.AlreadyExistsException;
-import org.apache.iceberg.exceptions.NoSuchTableException;
-import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
-import org.apache.iceberg.relocated.com.google.common.collect.Maps;
-import org.apache.iceberg.spark.JobGroupInfo;
-import org.apache.iceberg.spark.SparkSessionCatalog;
-import org.apache.iceberg.spark.SparkTableUtil;
-import org.apache.iceberg.spark.source.StagedSparkTable;
+import org.apache.iceberg.spark.actions.BaseMigrateTableSparkAction;
 import org.apache.spark.sql.SparkSession;
-import org.apache.spark.sql.catalyst.TableIdentifier;
 import org.apache.spark.sql.connector.catalog.CatalogPlugin;
 import org.apache.spark.sql.connector.catalog.Identifier;
-import org.apache.spark.sql.connector.catalog.TableCatalog;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.Some;
-import scala.collection.JavaConverters;
 
 /**
  * Takes a Spark table in the sourceCatalog and attempts to transform it into an Iceberg
  * Table in the same location with the same identifier. Once complete the identifier which
  * previously referred to a non-iceberg table will refer to the newly migrated iceberg
  * table.
+ *
+ * @deprecated since 0.12.0, will be removed in 0.13.0; use {@link MigrateTable} instead.
  */
-public class Spark3MigrateAction extends Spark3CreateAction {
-  private static final Logger LOG = LoggerFactory.getLogger(Spark3MigrateAction.class);
-  private static final String BACKUP_SUFFIX = "_BACKUP_";
+@Deprecated
+public class Spark3MigrateAction implements CreateAction {
 
-  private final Identifier backupIdent;
+  private final MigrateTable delegate;
 
   public Spark3MigrateAction(SparkSession spark, CatalogPlugin sourceCatalog, Identifier sourceTableName) {
-    super(spark, sourceCatalog, sourceTableName, sourceCatalog, sourceTableName);
-    String backupName = sourceTableIdent().name() + BACKUP_SUFFIX;
-    this.backupIdent = Identifier.of(sourceTableIdent().namespace(), backupName);
-  }
-
-  private Long doExecute() {
-    LOG.info("Starting the migration of {} to Iceberg", sourceTableIdent());
-
-    // Move source table to a new name, halting all modifications and allowing us to stage
-    // the creation of a new Iceberg table in its place
-    renameAndBackupSourceTable();
-
-    StagedSparkTable stagedTable = null;
-    Table icebergTable;
-    boolean threw = true;
-    try {
-      LOG.info("Staging a new Iceberg table {}", destTableIdent());
-      stagedTable = stageDestTable();
-      icebergTable = stagedTable.table();
-
-      LOG.info("Ensuring {} has a valid name mapping", destTableIdent());
-      ensureNameMappingPresent(icebergTable);
-
-      String stagingLocation = getMetadataLocation(icebergTable);
-      Some<String> backupNamespace = Some.apply(backupIdent.namespace()[0]);
-      TableIdentifier v1BackupIdentifier = new TableIdentifier(backupIdent.name(), backupNamespace);
-      LOG.info("Generating Iceberg metadata for {} in {}", destTableIdent(), stagingLocation);
-      SparkTableUtil.importSparkTable(spark(), v1BackupIdentifier, icebergTable, stagingLocation);
-
-      LOG.info("Committing staged changes to {}", destTableIdent());
-      stagedTable.commitStagedChanges();
-      threw = false;
-    } finally {
-      if (threw) {
-        LOG.error("Error when attempting perform migration changes, aborting table creation and restoring backup.");
-
-        restoreSourceTable();
-
-        if (stagedTable != null) {
-          try {
-            stagedTable.abortStagedChanges();
-          } catch (Exception abortException) {
-            LOG.error("Cannot abort staged changes", abortException);
-          }
-        }
-      }
-    }
-
-    Snapshot snapshot = icebergTable.currentSnapshot();
-    long numMigratedFiles = Long.parseLong(snapshot.summary().get(SnapshotSummary.TOTAL_DATA_FILES_PROP));
-    LOG.info("Successfully loaded Iceberg metadata for {} files", numMigratedFiles);
-    return numMigratedFiles;
+    this.delegate = new BaseMigrateTableSparkAction(spark, sourceCatalog, sourceTableName);
   }
 
   @Override
-  public Long execute() {
-    JobGroupInfo info = new JobGroupInfo("MIGRATE", "MIGRATE", false);
-    return withJobGroupInfo(info, this::doExecute);
+  public CreateAction withProperties(Map<String, String> properties) {
+    delegate.tableProperties(properties);
+    return this;
   }
 
   @Override
-  protected Map<String, String> targetTableProps() {
-    Map<String, String> properties = Maps.newHashMap();
-
-    // copy over relevant source table props
-    properties.putAll(JavaConverters.mapAsJavaMapConverter(v1SourceTable().properties()).asJava());
-    EXCLUDED_PROPERTIES.forEach(properties::remove);
-
-    // set default and user-provided props
-    properties.put(TableCatalog.PROP_PROVIDER, "iceberg");
-    properties.putAll(additionalProperties());
-
-    // make sure we mark this table as migrated
-    properties.put("migrated", "true");
-
-    // inherit the source table location
-    properties.putIfAbsent(LOCATION, sourceTableLocation());
-
-    return properties;
+  public CreateAction withProperty(String key, String value) {
+    delegate.tableProperty(key, value);
+    return this;
   }
 
   @Override
-  protected TableCatalog checkSourceCatalog(CatalogPlugin catalog) {
-    // Currently the Import code relies on being able to look up the table in the session code
-    Preconditions.checkArgument(catalog instanceof SparkSessionCatalog,
-        "Cannot migrate a table from a non-Iceberg Spark Session Catalog. Found %s of class %s as the source catalog.",
-        catalog.name(), catalog.getClass().getName());
-
-    return (TableCatalog) catalog;
-  }
-
-  private void renameAndBackupSourceTable() {
-    try {
-      LOG.info("Renaming {} as {} for backup", sourceTableIdent(), backupIdent);
-      destCatalog().renameTable(sourceTableIdent(), backupIdent);
-
-    } catch (org.apache.spark.sql.catalyst.analysis.NoSuchTableException e) {
-      throw new NoSuchTableException("Cannot find source table %s", sourceTableIdent());
-
-    } catch (org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException e) {
-      throw new AlreadyExistsException(
-          "Cannot rename %s as %s for backup. The backup table already exists.",
-          sourceTableIdent(), backupIdent);
-    }
-  }
-
-  private void restoreSourceTable() {
-    try {
-      LOG.info("Restoring {} from {}", sourceTableIdent(), backupIdent);
-      destCatalog().renameTable(backupIdent, sourceTableIdent());
-
-    } catch (org.apache.spark.sql.catalyst.analysis.NoSuchTableException e) {
-      LOG.error("Cannot restore the original table, the backup table {} cannot be found", backupIdent, e);
-
-    } catch (org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException e) {
-      LOG.error("Cannot restore the original table, a table with the original name exists. " +
-          "Use the backup table {} to restore the original table manually.", backupIdent, e);
-    }
+  public Long execute() {
+    MigrateTable.Result result = delegate.execute();
+    return result.migratedDataFilesCount();
   }
 }
diff --git a/spark3/src/main/java/org/apache/iceberg/actions/Spark3SnapshotAction.java b/spark3/src/main/java/org/apache/iceberg/actions/Spark3SnapshotAction.java
index c4ed14d..9948350 100644
--- a/spark3/src/main/java/org/apache/iceberg/actions/Spark3SnapshotAction.java
+++ b/spark3/src/main/java/org/apache/iceberg/actions/Spark3SnapshotAction.java
@@ -20,132 +20,51 @@
 package org.apache.iceberg.actions;
 
 import java.util.Map;
-import org.apache.iceberg.Snapshot;
-import org.apache.iceberg.SnapshotSummary;
-import org.apache.iceberg.Table;
-import org.apache.iceberg.TableProperties;
-import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
-import org.apache.iceberg.relocated.com.google.common.collect.Maps;
-import org.apache.iceberg.spark.JobGroupInfo;
-import org.apache.iceberg.spark.SparkTableUtil;
-import org.apache.iceberg.spark.source.StagedSparkTable;
+import org.apache.iceberg.spark.actions.BaseSnapshotTableSparkAction;
 import org.apache.spark.sql.SparkSession;
-import org.apache.spark.sql.catalyst.TableIdentifier;
 import org.apache.spark.sql.connector.catalog.CatalogPlugin;
 import org.apache.spark.sql.connector.catalog.Identifier;
-import org.apache.spark.sql.connector.catalog.TableCatalog;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.collection.JavaConverters;
 
 /**
  * Creates a new Iceberg table based on a source Spark table. The new Iceberg table will
  * have a different data and metadata directory allowing it to exist independently of the
  * source table.
+ *
+ * @deprecated since 0.12.0, will be removed in 0.13.0; use {@link SnapshotTable} instead.
  */
-public class Spark3SnapshotAction extends Spark3CreateAction implements SnapshotAction {
-  private static final Logger LOG = LoggerFactory.getLogger(Spark3SnapshotAction.class);
+@Deprecated
+public class Spark3SnapshotAction implements SnapshotAction {
 
-  private String destTableLocation = null;
+  private final SnapshotTable delegate;
 
   public Spark3SnapshotAction(SparkSession spark, CatalogPlugin sourceCatalog,
                               Identifier sourceTableIdent, CatalogPlugin destCatalog,
                               Identifier destTableIdent) {
-    super(spark, sourceCatalog, sourceTableIdent, destCatalog, destTableIdent);
-  }
-
-  private Long doExecute() {
-    LOG.info("Staging a new Iceberg table {} as a snapshot of {}", destTableIdent(), sourceTableIdent());
-    StagedSparkTable stagedTable = stageDestTable();
-    Table icebergTable = stagedTable.table();
-    // TODO Check table location here against source location
-
-    boolean threw = true;
-    try {
-      LOG.info("Ensuring {} has a valid name mapping", destTableIdent());
-      ensureNameMappingPresent(icebergTable);
-
-      String stagingLocation = getMetadataLocation(icebergTable);
-      TableIdentifier v1TableIdentifier = v1SourceTable().identifier();
-      LOG.info("Generating Iceberg metadata for {} in {}", destTableIdent(), stagingLocation);
-      SparkTableUtil.importSparkTable(spark(), v1TableIdentifier, icebergTable, stagingLocation);
-
-      LOG.info("Committing staged changes to {}", destTableIdent());
-      stagedTable.commitStagedChanges();
-      threw = false;
-    } finally {
-      if (threw) {
-        LOG.error("Error when attempting to commit snapshot changes, rolling back");
 
-        try {
-          stagedTable.abortStagedChanges();
-        } catch (Exception abortException) {
-          LOG.error("Cannot abort staged changes", abortException);
-        }
-      }
-    }
-
-    Snapshot snapshot = icebergTable.currentSnapshot();
-    long numMigratedFiles = Long.parseLong(snapshot.summary().get(SnapshotSummary.TOTAL_DATA_FILES_PROP));
-    LOG.info("Successfully loaded Iceberg metadata for {} files to {}", numMigratedFiles, destTableIdent());
-    return numMigratedFiles;
+    delegate = new BaseSnapshotTableSparkAction(spark, sourceCatalog, sourceTableIdent, destCatalog, destTableIdent);
   }
 
   @Override
-  public Long execute() {
-    JobGroupInfo info = new JobGroupInfo("SNAPSHOT", "SNAPSHOT", false);
-    return withJobGroupInfo(info, this::doExecute);
+  public SnapshotAction withLocation(String location) {
+    delegate.tableLocation(location);
+    return this;
   }
 
   @Override
-  protected Map<String, String> targetTableProps() {
-    Map<String, String> properties = Maps.newHashMap();
-
-    // copy over relevant source table props
-    properties.putAll(JavaConverters.mapAsJavaMapConverter(v1SourceTable().properties()).asJava());
-    EXCLUDED_PROPERTIES.forEach(properties::remove);
-
-    // Remove any possible location properties from origin properties
-    properties.remove(LOCATION);
-    properties.remove(TableProperties.WRITE_METADATA_LOCATION);
-    properties.remove(TableProperties.WRITE_NEW_DATA_LOCATION);
-
-    // set default and user-provided props
-    properties.put(TableCatalog.PROP_PROVIDER, "iceberg");
-    properties.putAll(additionalProperties());
-
-    // make sure we mark this table as a snapshot table
-    properties.put(TableProperties.GC_ENABLED, "false");
-    properties.put("snapshot", "true");
-
-    // Don't use the default location for the destination table if an alternate has be set
-    if (destTableLocation != null) {
-      properties.put(LOCATION, destTableLocation);
-    }
-
-    return properties;
+  public SnapshotAction withProperties(Map<String, String> properties) {
+    delegate.tableProperties(properties);
+    return this;
   }
 
   @Override
-  protected TableCatalog checkSourceCatalog(CatalogPlugin catalog) {
-    // Currently the Import code relies on being able to look up the table in the session code
-    Preconditions.checkArgument(catalog.name().equals("spark_catalog"),
-        "Cannot snapshot a table that isn't in spark_catalog, the session catalog. Found source catalog %s",
-        catalog.name());
-
-    Preconditions.checkArgument(catalog instanceof TableCatalog,
-        "Cannot snapshot a table from a non-table catalog %s. Catalog has class of %s.", catalog.name(),
-        catalog.getClass().toString());
-
-    return (TableCatalog) catalog;
+  public SnapshotAction withProperty(String key, String value) {
+    delegate.tableProperty(key, value);
+    return this;
   }
 
   @Override
-  public SnapshotAction withLocation(String location) {
-    Preconditions.checkArgument(!sourceTableLocation().equals(location),
-        "Cannot create snapshot where destination location is the same as the source location." +
-            " This would cause a mixing of original table created and snapshot created files.");
-    this.destTableLocation = location;
-    return this;
+  public Long execute() {
+    SnapshotTable.Result result = delegate.execute();
+    return result.importedDataFilesCount();
   }
 }
diff --git a/spark3/src/main/java/org/apache/iceberg/actions/Spark3MigrateAction.java b/spark3/src/main/java/org/apache/iceberg/spark/actions/BaseMigrateTableSparkAction.java
similarity index 67%
copy from spark3/src/main/java/org/apache/iceberg/actions/Spark3MigrateAction.java
copy to spark3/src/main/java/org/apache/iceberg/spark/actions/BaseMigrateTableSparkAction.java
index e2df5ee..021c5e5 100644
--- a/spark3/src/main/java/org/apache/iceberg/actions/Spark3MigrateAction.java
+++ b/spark3/src/main/java/org/apache/iceberg/spark/actions/BaseMigrateTableSparkAction.java
@@ -17,12 +17,14 @@
  * under the License.
  */
 
-package org.apache.iceberg.actions;
+package org.apache.iceberg.spark.actions;
 
 import java.util.Map;
 import org.apache.iceberg.Snapshot;
 import org.apache.iceberg.SnapshotSummary;
 import org.apache.iceberg.Table;
+import org.apache.iceberg.actions.BaseMigrateTableActionResult;
+import org.apache.iceberg.actions.MigrateTable;
 import org.apache.iceberg.exceptions.AlreadyExistsException;
 import org.apache.iceberg.exceptions.NoSuchTableException;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
@@ -35,6 +37,7 @@ import org.apache.spark.sql.SparkSession;
 import org.apache.spark.sql.catalyst.TableIdentifier;
 import org.apache.spark.sql.connector.catalog.CatalogPlugin;
 import org.apache.spark.sql.connector.catalog.Identifier;
+import org.apache.spark.sql.connector.catalog.StagingTableCatalog;
 import org.apache.spark.sql.connector.catalog.TableCatalog;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -42,27 +45,67 @@ import scala.Some;
 import scala.collection.JavaConverters;
 
 /**
- * Takes a Spark table in the sourceCatalog and attempts to transform it into an Iceberg
- * Table in the same location with the same identifier. Once complete the identifier which
- * previously referred to a non-iceberg table will refer to the newly migrated iceberg
+ * Takes a Spark table in the source catalog and attempts to transform it into an Iceberg
+ * table in the same location with the same identifier. Once complete the identifier which
+ * previously referred to a non-Iceberg table will refer to the newly migrated Iceberg
  * table.
  */
-public class Spark3MigrateAction extends Spark3CreateAction {
-  private static final Logger LOG = LoggerFactory.getLogger(Spark3MigrateAction.class);
+public class BaseMigrateTableSparkAction
+    extends BaseTableCreationSparkAction<MigrateTable, MigrateTable.Result>
+    implements MigrateTable {
+
+  private static final Logger LOG = LoggerFactory.getLogger(BaseMigrateTableSparkAction.class);
   private static final String BACKUP_SUFFIX = "_BACKUP_";
 
+  private final StagingTableCatalog destCatalog;
+  private final Identifier destTableIdent;
   private final Identifier backupIdent;
 
-  public Spark3MigrateAction(SparkSession spark, CatalogPlugin sourceCatalog, Identifier sourceTableName) {
-    super(spark, sourceCatalog, sourceTableName, sourceCatalog, sourceTableName);
-    String backupName = sourceTableIdent().name() + BACKUP_SUFFIX;
-    this.backupIdent = Identifier.of(sourceTableIdent().namespace(), backupName);
+  public BaseMigrateTableSparkAction(SparkSession spark, CatalogPlugin sourceCatalog, Identifier sourceTableIdent) {
+    super(spark, sourceCatalog, sourceTableIdent);
+    this.destCatalog = checkDestinationCatalog(sourceCatalog);
+    this.destTableIdent = sourceTableIdent;
+    String backupName = sourceTableIdent.name() + BACKUP_SUFFIX;
+    this.backupIdent = Identifier.of(sourceTableIdent.namespace(), backupName);
+  }
+
+  @Override
+  protected MigrateTable self() {
+    return this;
+  }
+
+  @Override
+  protected StagingTableCatalog destCatalog() {
+    return destCatalog;
+  }
+
+  @Override
+  protected Identifier destTableIdent() {
+    return destTableIdent;
+  }
+
+  @Override
+  public MigrateTable tableProperties(Map<String, String> properties) {
+    setProperties(properties);
+    return this;
+  }
+
+  @Override
+  public MigrateTable tableProperty(String property, String value) {
+    setProperty(property, value);
+    return this;
+  }
+
+  @Override
+  public MigrateTable.Result execute() {
+    JobGroupInfo info = newJobGroupInfo("MIGRATE-TABLE", "MIGRATE-TABLE");
+    return withJobGroupInfo(info, this::doExecute);
   }
 
-  private Long doExecute() {
+  private MigrateTable.Result doExecute() {
     LOG.info("Starting the migration of {} to Iceberg", sourceTableIdent());
 
-    // Move source table to a new name, halting all modifications and allowing us to stage
+    // move the source table to a new name, halting all modifications and allowing us to stage
     // the creation of a new Iceberg table in its place
     renameAndBackupSourceTable();
 
@@ -77,18 +120,18 @@ public class Spark3MigrateAction extends Spark3CreateAction {
       LOG.info("Ensuring {} has a valid name mapping", destTableIdent());
       ensureNameMappingPresent(icebergTable);
 
-      String stagingLocation = getMetadataLocation(icebergTable);
       Some<String> backupNamespace = Some.apply(backupIdent.namespace()[0]);
-      TableIdentifier v1BackupIdentifier = new TableIdentifier(backupIdent.name(), backupNamespace);
+      TableIdentifier v1BackupIdent = new TableIdentifier(backupIdent.name(), backupNamespace);
+      String stagingLocation = getMetadataLocation(icebergTable);
       LOG.info("Generating Iceberg metadata for {} in {}", destTableIdent(), stagingLocation);
-      SparkTableUtil.importSparkTable(spark(), v1BackupIdentifier, icebergTable, stagingLocation);
+      SparkTableUtil.importSparkTable(spark(), v1BackupIdent, icebergTable, stagingLocation);
 
       LOG.info("Committing staged changes to {}", destTableIdent());
       stagedTable.commitStagedChanges();
       threw = false;
     } finally {
       if (threw) {
-        LOG.error("Error when attempting perform migration changes, aborting table creation and restoring backup.");
+        LOG.error("Failed to perform the migration, aborting table creation and restoring the original table");
 
         restoreSourceTable();
 
@@ -103,19 +146,13 @@ public class Spark3MigrateAction extends Spark3CreateAction {
     }
 
     Snapshot snapshot = icebergTable.currentSnapshot();
-    long numMigratedFiles = Long.parseLong(snapshot.summary().get(SnapshotSummary.TOTAL_DATA_FILES_PROP));
-    LOG.info("Successfully loaded Iceberg metadata for {} files", numMigratedFiles);
-    return numMigratedFiles;
-  }
-
-  @Override
-  public Long execute() {
-    JobGroupInfo info = new JobGroupInfo("MIGRATE", "MIGRATE", false);
-    return withJobGroupInfo(info, this::doExecute);
+    long migratedDataFilesCount = Long.parseLong(snapshot.summary().get(SnapshotSummary.TOTAL_DATA_FILES_PROP));
+    LOG.info("Successfully loaded Iceberg metadata for {} files to {}", migratedDataFilesCount, destTableIdent());
+    return new BaseMigrateTableActionResult(migratedDataFilesCount);
   }
 
   @Override
-  protected Map<String, String> targetTableProps() {
+  protected Map<String, String> destTableProps() {
     Map<String, String> properties = Maps.newHashMap();
 
     // copy over relevant source table props
@@ -137,7 +174,7 @@ public class Spark3MigrateAction extends Spark3CreateAction {
 
   @Override
   protected TableCatalog checkSourceCatalog(CatalogPlugin catalog) {
-    // Currently the Import code relies on being able to look up the table in the session code
+    // currently the import code relies on being able to look up the table in the session catalog
     Preconditions.checkArgument(catalog instanceof SparkSessionCatalog,
         "Cannot migrate a table from a non-Iceberg Spark Session Catalog. Found %s of class %s as the source catalog.",
         catalog.name(), catalog.getClass().getName());
diff --git a/spark3/src/main/java/org/apache/iceberg/actions/Spark3SnapshotAction.java b/spark3/src/main/java/org/apache/iceberg/spark/actions/BaseSnapshotTableSparkAction.java
similarity index 50%
copy from spark3/src/main/java/org/apache/iceberg/actions/Spark3SnapshotAction.java
copy to spark3/src/main/java/org/apache/iceberg/spark/actions/BaseSnapshotTableSparkAction.java
index c4ed14d..4d4f37b 100644
--- a/spark3/src/main/java/org/apache/iceberg/actions/Spark3SnapshotAction.java
+++ b/spark3/src/main/java/org/apache/iceberg/spark/actions/BaseSnapshotTableSparkAction.java
@@ -17,22 +17,27 @@
  * under the License.
  */
 
-package org.apache.iceberg.actions;
+package org.apache.iceberg.spark.actions;
 
 import java.util.Map;
 import org.apache.iceberg.Snapshot;
 import org.apache.iceberg.SnapshotSummary;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.actions.BaseSnapshotTableActionResult;
+import org.apache.iceberg.actions.SnapshotTable;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
 import org.apache.iceberg.spark.JobGroupInfo;
+import org.apache.iceberg.spark.Spark3Util;
+import org.apache.iceberg.spark.Spark3Util.CatalogAndIdentifier;
 import org.apache.iceberg.spark.SparkTableUtil;
 import org.apache.iceberg.spark.source.StagedSparkTable;
 import org.apache.spark.sql.SparkSession;
 import org.apache.spark.sql.catalyst.TableIdentifier;
 import org.apache.spark.sql.connector.catalog.CatalogPlugin;
 import org.apache.spark.sql.connector.catalog.Identifier;
+import org.apache.spark.sql.connector.catalog.StagingTableCatalog;
 import org.apache.spark.sql.connector.catalog.TableCatalog;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -43,39 +48,98 @@ import scala.collection.JavaConverters;
  * have a different data and metadata directory allowing it to exist independently of the
  * source table.
  */
-public class Spark3SnapshotAction extends Spark3CreateAction implements SnapshotAction {
-  private static final Logger LOG = LoggerFactory.getLogger(Spark3SnapshotAction.class);
+public class BaseSnapshotTableSparkAction
+    extends BaseTableCreationSparkAction<SnapshotTable, SnapshotTable.Result>
+    implements SnapshotTable {
 
+  private static final Logger LOG = LoggerFactory.getLogger(BaseSnapshotTableSparkAction.class);
+
+  private StagingTableCatalog destCatalog;
+  private Identifier destTableIdent;
   private String destTableLocation = null;
 
-  public Spark3SnapshotAction(SparkSession spark, CatalogPlugin sourceCatalog,
-                              Identifier sourceTableIdent, CatalogPlugin destCatalog,
-                              Identifier destTableIdent) {
-    super(spark, sourceCatalog, sourceTableIdent, destCatalog, destTableIdent);
+  BaseSnapshotTableSparkAction(SparkSession spark, CatalogPlugin sourceCatalog, Identifier sourceTableIdent) {
+    super(spark, sourceCatalog, sourceTableIdent);
+  }
+
+  // used by the old constructor
+  public BaseSnapshotTableSparkAction(SparkSession spark, CatalogPlugin sourceCatalog, Identifier sourceTableIdent,
+                                      CatalogPlugin destCatalog, Identifier destTableIdent) {
+    super(spark, sourceCatalog, sourceTableIdent);
+    this.destCatalog = checkDestinationCatalog(destCatalog);
+    this.destTableIdent = destTableIdent;
+  }
+
+  @Override
+  protected SnapshotTable self() {
+    return this;
+  }
+
+  @Override
+  protected StagingTableCatalog destCatalog() {
+    return destCatalog;
   }
 
-  private Long doExecute() {
+  @Override
+  protected Identifier destTableIdent() {
+    return destTableIdent;
+  }
+
+  @Override
+  public SnapshotTable as(String ident) {
+    String ctx = "snapshot destination";
+    CatalogPlugin defaultCatalog = spark().sessionState().catalogManager().currentCatalog();
+    CatalogAndIdentifier catalogAndIdent = Spark3Util.catalogAndIdentifier(ctx, spark(), ident, defaultCatalog);
+    this.destCatalog = checkDestinationCatalog(catalogAndIdent.catalog());
+    this.destTableIdent = catalogAndIdent.identifier();
+    return this;
+  }
+
+  @Override
+  public SnapshotTable tableProperties(Map<String, String> properties) {
+    setProperties(properties);
+    return this;
+  }
+
+  @Override
+  public SnapshotTable tableProperty(String property, String value) {
+    setProperty(property, value);
+    return this;
+  }
+
+  @Override
+  public SnapshotTable.Result execute() {
+    JobGroupInfo info = newJobGroupInfo("SNAPSHOT-TABLE", "SNAPSHOT-TABLE");
+    return withJobGroupInfo(info, this::doExecute);
+  }
+
+  private SnapshotTable.Result doExecute() {
+    Preconditions.checkArgument(destCatalog() != null && destTableIdent() != null,
+        "The destination catalog and identifier cannot be null. " +
+        "Make sure to configure the action with a valid destination table identifier via the `as` method.");
+
     LOG.info("Staging a new Iceberg table {} as a snapshot of {}", destTableIdent(), sourceTableIdent());
     StagedSparkTable stagedTable = stageDestTable();
     Table icebergTable = stagedTable.table();
-    // TODO Check table location here against source location
+
+    // TODO: Check the dest table location does not overlap with the source table location
 
     boolean threw = true;
     try {
       LOG.info("Ensuring {} has a valid name mapping", destTableIdent());
       ensureNameMappingPresent(icebergTable);
 
+      TableIdentifier v1TableIdent = v1SourceTable().identifier();
       String stagingLocation = getMetadataLocation(icebergTable);
-      TableIdentifier v1TableIdentifier = v1SourceTable().identifier();
       LOG.info("Generating Iceberg metadata for {} in {}", destTableIdent(), stagingLocation);
-      SparkTableUtil.importSparkTable(spark(), v1TableIdentifier, icebergTable, stagingLocation);
+      SparkTableUtil.importSparkTable(spark(), v1TableIdent, icebergTable, stagingLocation);
 
       LOG.info("Committing staged changes to {}", destTableIdent());
       stagedTable.commitStagedChanges();
       threw = false;
     } finally {
       if (threw) {
-        LOG.error("Error when attempting to commit snapshot changes, rolling back");
+        LOG.error("Error when populating the staged table with metadata, aborting changes");
 
         try {
           stagedTable.abortStagedChanges();
@@ -86,26 +150,20 @@ public class Spark3SnapshotAction extends Spark3CreateAction implements Snapshot
     }
 
     Snapshot snapshot = icebergTable.currentSnapshot();
-    long numMigratedFiles = Long.parseLong(snapshot.summary().get(SnapshotSummary.TOTAL_DATA_FILES_PROP));
-    LOG.info("Successfully loaded Iceberg metadata for {} files to {}", numMigratedFiles, destTableIdent());
-    return numMigratedFiles;
-  }
-
-  @Override
-  public Long execute() {
-    JobGroupInfo info = new JobGroupInfo("SNAPSHOT", "SNAPSHOT", false);
-    return withJobGroupInfo(info, this::doExecute);
+    long importedDataFilesCount = Long.parseLong(snapshot.summary().get(SnapshotSummary.TOTAL_DATA_FILES_PROP));
+    LOG.info("Successfully loaded Iceberg metadata for {} files to {}", importedDataFilesCount, destTableIdent());
+    return new BaseSnapshotTableActionResult(importedDataFilesCount);
   }
 
   @Override
-  protected Map<String, String> targetTableProps() {
+  protected Map<String, String> destTableProps() {
     Map<String, String> properties = Maps.newHashMap();
 
     // copy over relevant source table props
     properties.putAll(JavaConverters.mapAsJavaMapConverter(v1SourceTable().properties()).asJava());
     EXCLUDED_PROPERTIES.forEach(properties::remove);
 
-    // Remove any possible location properties from origin properties
+    // remove any possible location properties from origin properties
     properties.remove(LOCATION);
     properties.remove(TableProperties.WRITE_METADATA_LOCATION);
     properties.remove(TableProperties.WRITE_NEW_DATA_LOCATION);
@@ -118,7 +176,7 @@ public class Spark3SnapshotAction extends Spark3CreateAction implements Snapshot
     properties.put(TableProperties.GC_ENABLED, "false");
     properties.put("snapshot", "true");
 
-    // Don't use the default location for the destination table if an alternate has be set
+    // set the destination table location if provided
     if (destTableLocation != null) {
       properties.put(LOCATION, destTableLocation);
     }
@@ -128,23 +186,23 @@ public class Spark3SnapshotAction extends Spark3CreateAction implements Snapshot
 
   @Override
   protected TableCatalog checkSourceCatalog(CatalogPlugin catalog) {
-    // Currently the Import code relies on being able to look up the table in the session code
-    Preconditions.checkArgument(catalog.name().equals("spark_catalog"),
-        "Cannot snapshot a table that isn't in spark_catalog, the session catalog. Found source catalog %s",
-        catalog.name());
+    // currently the import code relies on being able to look up the table in the session catalog
+    Preconditions.checkArgument(catalog.name().equalsIgnoreCase("spark_catalog"),
+        "Cannot snapshot a table that isn't in the session catalog (i.e. spark_catalog). " +
+        "Found source catalog: %s.", catalog.name());
 
     Preconditions.checkArgument(catalog instanceof TableCatalog,
-        "Cannot snapshot a table from a non-table catalog %s. Catalog has class of %s.", catalog.name(),
-        catalog.getClass().toString());
+        "Cannot snapshot as catalog %s of class %s in not a table catalog",
+        catalog.name(), catalog.getClass().getName());
 
     return (TableCatalog) catalog;
   }
 
   @Override
-  public SnapshotAction withLocation(String location) {
+  public SnapshotTable tableLocation(String location) {
     Preconditions.checkArgument(!sourceTableLocation().equals(location),
-        "Cannot create snapshot where destination location is the same as the source location." +
-            " This would cause a mixing of original table created and snapshot created files.");
+        "The snapshot table location cannot be same as the source table location. " +
+        "This would mix snapshot table files with original table files.");
     this.destTableLocation = location;
     return this;
   }
diff --git a/spark3/src/main/java/org/apache/iceberg/actions/Spark3CreateAction.java b/spark3/src/main/java/org/apache/iceberg/spark/actions/BaseTableCreationSparkAction.java
similarity index 74%
rename from spark3/src/main/java/org/apache/iceberg/actions/Spark3CreateAction.java
rename to spark3/src/main/java/org/apache/iceberg/spark/actions/BaseTableCreationSparkAction.java
index a87776d..6eadece 100644
--- a/spark3/src/main/java/org/apache/iceberg/actions/Spark3CreateAction.java
+++ b/spark3/src/main/java/org/apache/iceberg/spark/actions/BaseTableCreationSparkAction.java
@@ -17,13 +17,12 @@
  * under the License.
  */
 
-package org.apache.iceberg.actions;
+package org.apache.iceberg.spark.actions;
 
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
 import java.util.Set;
-import java.util.function.Supplier;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.TableProperties;
 import org.apache.iceberg.exceptions.AlreadyExistsException;
@@ -36,12 +35,9 @@ import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
-import org.apache.iceberg.spark.JobGroupInfo;
-import org.apache.iceberg.spark.JobGroupUtils;
 import org.apache.iceberg.spark.SparkCatalog;
 import org.apache.iceberg.spark.SparkSessionCatalog;
 import org.apache.iceberg.spark.source.StagedSparkTable;
-import org.apache.spark.SparkContext;
 import org.apache.spark.sql.SparkSession;
 import org.apache.spark.sql.catalyst.catalog.CatalogTable;
 import org.apache.spark.sql.catalyst.catalog.CatalogUtils;
@@ -53,15 +49,13 @@ import org.apache.spark.sql.connector.catalog.V1Table;
 import org.apache.spark.sql.connector.expressions.Transform;
 import org.apache.spark.sql.types.StructType;
 
-abstract class Spark3CreateAction implements CreateAction {
+abstract class BaseTableCreationSparkAction<ThisT, R> extends BaseSparkAction<ThisT, R> {
   private static final Set<String> ALLOWED_SOURCES = ImmutableSet.of("parquet", "avro", "orc", "hive");
   protected static final String LOCATION = "location";
   protected static final String ICEBERG_METADATA_FOLDER = "metadata";
   protected static final List<String> EXCLUDED_PROPERTIES =
       ImmutableList.of("path", "transient_lastDdlTime", "serialization.format");
 
-  private final SparkSession spark;
-
   // Source Fields
   private final V1Table sourceTable;
   private final CatalogTable sourceCatalogTable;
@@ -69,21 +63,14 @@ abstract class Spark3CreateAction implements CreateAction {
   private final TableCatalog sourceCatalog;
   private final Identifier sourceTableIdent;
 
-  // Destination Fields
-  private final StagingTableCatalog destCatalog;
-  private final Identifier destTableIdent;
-
   // Optional Parameters for destination
   private final Map<String, String> additionalProperties = Maps.newHashMap();
 
-  Spark3CreateAction(SparkSession spark, CatalogPlugin sourceCatalog, Identifier sourceTableIdent,
-                     CatalogPlugin destCatalog, Identifier destTableIdent) {
+  BaseTableCreationSparkAction(SparkSession spark, CatalogPlugin sourceCatalog, Identifier sourceTableIdent) {
+    super(spark);
 
-    this.spark = spark;
     this.sourceCatalog = checkSourceCatalog(sourceCatalog);
     this.sourceTableIdent = sourceTableIdent;
-    this.destCatalog = checkDestinationCatalog(destCatalog);
-    this.destTableIdent = destTableIdent;
 
     try {
       this.sourceTable = (V1Table) this.sourceCatalog.loadTable(sourceTableIdent);
@@ -99,21 +86,13 @@ abstract class Spark3CreateAction implements CreateAction {
     this.sourceTableLocation = CatalogUtils.URIToString(sourceCatalogTable.storage().locationUri().get());
   }
 
-  @Override
-  public CreateAction withProperties(Map<String, String> properties) {
-    this.additionalProperties.putAll(properties);
-    return this;
-  }
+  protected abstract TableCatalog checkSourceCatalog(CatalogPlugin catalog);
 
-  @Override
-  public CreateAction withProperty(String key, String value) {
-    this.additionalProperties.put(key, value);
-    return this;
-  }
+  protected abstract StagingTableCatalog destCatalog();
 
-  protected SparkSession spark() {
-    return spark;
-  }
+  protected abstract Identifier destTableIdent();
+
+  protected abstract Map<String, String> destTableProps();
 
   protected String sourceTableLocation() {
     return sourceTableLocation;
@@ -131,12 +110,12 @@ abstract class Spark3CreateAction implements CreateAction {
     return sourceTableIdent;
   }
 
-  protected StagingTableCatalog destCatalog() {
-    return destCatalog;
+  protected void setProperties(Map<String, String> properties) {
+    additionalProperties.putAll(properties);
   }
 
-  protected Identifier destTableIdent() {
-    return destTableIdent;
+  protected void setProperty(String key, String value) {
+    additionalProperties.put(key, value);
   }
 
   protected Map<String, String> additionalProperties() {
@@ -151,7 +130,7 @@ abstract class Spark3CreateAction implements CreateAction {
         "Cannot create an Iceberg table from a source without an explicit location");
   }
 
-  private StagingTableCatalog checkDestinationCatalog(CatalogPlugin catalog) {
+  protected StagingTableCatalog checkDestinationCatalog(CatalogPlugin catalog) {
     Preconditions.checkArgument(catalog instanceof SparkSessionCatalog || catalog instanceof SparkCatalog,
         "Cannot create Iceberg table in non-Iceberg Catalog. " +
             "Catalog '%s' was of class '%s' but '%s' or '%s' are required",
@@ -163,15 +142,14 @@ abstract class Spark3CreateAction implements CreateAction {
 
   protected StagedSparkTable stageDestTable() {
     try {
-      Map<String, String> props = targetTableProps();
+      Map<String, String> props = destTableProps();
       StructType schema = sourceTable.schema();
       Transform[] partitioning = sourceTable.partitioning();
-      return (StagedSparkTable) destCatalog.stageCreate(destTableIdent, schema, partitioning, props);
+      return (StagedSparkTable) destCatalog().stageCreate(destTableIdent(), schema, partitioning, props);
     } catch (org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException e) {
-      throw new NoSuchNamespaceException("Cannot create a table '%s' because the namespace does not exist",
-          destTableIdent);
+      throw new NoSuchNamespaceException("Cannot create table %s as the namespace does not exist", destTableIdent());
     } catch (org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException e) {
-      throw new AlreadyExistsException("Cannot create table '%s' because it already exists", destTableIdent);
+      throw new AlreadyExistsException("Cannot create table %s as it already exists", destTableIdent());
     }
   }
 
@@ -187,20 +165,4 @@ abstract class Spark3CreateAction implements CreateAction {
     return table.properties().getOrDefault(TableProperties.WRITE_METADATA_LOCATION,
         table.location() + "/" + ICEBERG_METADATA_FOLDER);
   }
-
-  protected abstract Map<String, String> targetTableProps();
-
-  protected abstract TableCatalog checkSourceCatalog(CatalogPlugin catalog);
-
-  protected <T> T withJobGroupInfo(JobGroupInfo info, Supplier<T> supplier) {
-    SparkContext context = spark().sparkContext();
-    JobGroupInfo previousInfo = JobGroupUtils.getJobGroupInfo(context);
-    try {
-      JobGroupUtils.setJobGroupInfo(context, info);
-      return supplier.get();
-    } finally {
-      JobGroupUtils.setJobGroupInfo(context, previousInfo);
-    }
-  }
-
 }