You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by ao...@apache.org on 2021/04/13 04:03:06 UTC
[iceberg] branch master updated: Spark: Refactor snapshot and
migrate actions (#2437)
This is an automated email from the ASF dual-hosted git repository.
aokolnychyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 6eaabb5 Spark: Refactor snapshot and migrate actions (#2437)
6eaabb5 is described below
commit 6eaabb59653f51bdfbb4063ebe14632b1f044d96
Author: Anton Okolnychyi <ao...@apple.com>
AuthorDate: Mon Apr 12 21:02:48 2021 -0700
Spark: Refactor snapshot and migrate actions (#2437)
---
.../actions/BaseMigrateTableActionResult.java | 14 +-
.../actions/BaseSnapshotTableActionResult.java | 14 +-
.../org/apache/iceberg/actions/CreateAction.java | 1 +
.../org/apache/iceberg/actions/SnapshotAction.java | 1 +
.../iceberg/actions/Spark3MigrateAction.java | 146 +++------------------
.../iceberg/actions/Spark3SnapshotAction.java | 119 +++--------------
.../actions/BaseMigrateTableSparkAction.java} | 91 +++++++++----
.../actions/BaseSnapshotTableSparkAction.java} | 124 ++++++++++++-----
.../actions/BaseTableCreationSparkAction.java} | 74 +++--------
9 files changed, 234 insertions(+), 350 deletions(-)
diff --git a/spark/src/main/java/org/apache/iceberg/actions/SnapshotAction.java b/core/src/main/java/org/apache/iceberg/actions/BaseMigrateTableActionResult.java
similarity index 71%
copy from spark/src/main/java/org/apache/iceberg/actions/SnapshotAction.java
copy to core/src/main/java/org/apache/iceberg/actions/BaseMigrateTableActionResult.java
index cb1a559..4d7397b 100644
--- a/spark/src/main/java/org/apache/iceberg/actions/SnapshotAction.java
+++ b/core/src/main/java/org/apache/iceberg/actions/BaseMigrateTableActionResult.java
@@ -19,6 +19,16 @@
package org.apache.iceberg.actions;
-public interface SnapshotAction extends CreateAction {
- SnapshotAction withLocation(String location);
+public class BaseMigrateTableActionResult implements MigrateTable.Result {
+
+ private final long migratedDataFilesCount;
+
+ public BaseMigrateTableActionResult(long migratedDataFilesCount) {
+ this.migratedDataFilesCount = migratedDataFilesCount;
+ }
+
+ @Override
+ public long migratedDataFilesCount() {
+ return migratedDataFilesCount;
+ }
}
diff --git a/spark/src/main/java/org/apache/iceberg/actions/SnapshotAction.java b/core/src/main/java/org/apache/iceberg/actions/BaseSnapshotTableActionResult.java
similarity index 70%
copy from spark/src/main/java/org/apache/iceberg/actions/SnapshotAction.java
copy to core/src/main/java/org/apache/iceberg/actions/BaseSnapshotTableActionResult.java
index cb1a559..3ea24d3 100644
--- a/spark/src/main/java/org/apache/iceberg/actions/SnapshotAction.java
+++ b/core/src/main/java/org/apache/iceberg/actions/BaseSnapshotTableActionResult.java
@@ -19,6 +19,16 @@
package org.apache.iceberg.actions;
-public interface SnapshotAction extends CreateAction {
- SnapshotAction withLocation(String location);
+public class BaseSnapshotTableActionResult implements SnapshotTable.Result {
+
+ private final long importedDataFilesCount;
+
+ public BaseSnapshotTableActionResult(long importedDataFilesCount) {
+ this.importedDataFilesCount = importedDataFilesCount;
+ }
+
+ @Override
+ public long importedDataFilesCount() {
+ return importedDataFilesCount;
+ }
}
diff --git a/spark/src/main/java/org/apache/iceberg/actions/CreateAction.java b/spark/src/main/java/org/apache/iceberg/actions/CreateAction.java
index 6760dd1..a567bda 100644
--- a/spark/src/main/java/org/apache/iceberg/actions/CreateAction.java
+++ b/spark/src/main/java/org/apache/iceberg/actions/CreateAction.java
@@ -21,6 +21,7 @@ package org.apache.iceberg.actions;
import java.util.Map;
+@Deprecated
public interface CreateAction extends Action<CreateAction, Long> {
/**
diff --git a/spark/src/main/java/org/apache/iceberg/actions/SnapshotAction.java b/spark/src/main/java/org/apache/iceberg/actions/SnapshotAction.java
index cb1a559..6c3e8e6 100644
--- a/spark/src/main/java/org/apache/iceberg/actions/SnapshotAction.java
+++ b/spark/src/main/java/org/apache/iceberg/actions/SnapshotAction.java
@@ -19,6 +19,7 @@
package org.apache.iceberg.actions;
+@Deprecated
public interface SnapshotAction extends CreateAction {
SnapshotAction withLocation(String location);
}
diff --git a/spark3/src/main/java/org/apache/iceberg/actions/Spark3MigrateAction.java b/spark3/src/main/java/org/apache/iceberg/actions/Spark3MigrateAction.java
index e2df5ee..b5c777d 100644
--- a/spark3/src/main/java/org/apache/iceberg/actions/Spark3MigrateAction.java
+++ b/spark3/src/main/java/org/apache/iceberg/actions/Spark3MigrateAction.java
@@ -20,157 +20,43 @@
package org.apache.iceberg.actions;
import java.util.Map;
-import org.apache.iceberg.Snapshot;
-import org.apache.iceberg.SnapshotSummary;
-import org.apache.iceberg.Table;
-import org.apache.iceberg.exceptions.AlreadyExistsException;
-import org.apache.iceberg.exceptions.NoSuchTableException;
-import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
-import org.apache.iceberg.relocated.com.google.common.collect.Maps;
-import org.apache.iceberg.spark.JobGroupInfo;
-import org.apache.iceberg.spark.SparkSessionCatalog;
-import org.apache.iceberg.spark.SparkTableUtil;
-import org.apache.iceberg.spark.source.StagedSparkTable;
+import org.apache.iceberg.spark.actions.BaseMigrateTableSparkAction;
import org.apache.spark.sql.SparkSession;
-import org.apache.spark.sql.catalyst.TableIdentifier;
import org.apache.spark.sql.connector.catalog.CatalogPlugin;
import org.apache.spark.sql.connector.catalog.Identifier;
-import org.apache.spark.sql.connector.catalog.TableCatalog;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.Some;
-import scala.collection.JavaConverters;
/**
* Takes a Spark table in the sourceCatalog and attempts to transform it into an Iceberg
* Table in the same location with the same identifier. Once complete the identifier which
* previously referred to a non-iceberg table will refer to the newly migrated iceberg
* table.
+ *
+ * @deprecated since 0.12.0, will be removed in 0.13.0; use {@link MigrateTable} instead.
*/
-public class Spark3MigrateAction extends Spark3CreateAction {
- private static final Logger LOG = LoggerFactory.getLogger(Spark3MigrateAction.class);
- private static final String BACKUP_SUFFIX = "_BACKUP_";
+@Deprecated
+public class Spark3MigrateAction implements CreateAction {
- private final Identifier backupIdent;
+ private final MigrateTable delegate;
public Spark3MigrateAction(SparkSession spark, CatalogPlugin sourceCatalog, Identifier sourceTableName) {
- super(spark, sourceCatalog, sourceTableName, sourceCatalog, sourceTableName);
- String backupName = sourceTableIdent().name() + BACKUP_SUFFIX;
- this.backupIdent = Identifier.of(sourceTableIdent().namespace(), backupName);
- }
-
- private Long doExecute() {
- LOG.info("Starting the migration of {} to Iceberg", sourceTableIdent());
-
- // Move source table to a new name, halting all modifications and allowing us to stage
- // the creation of a new Iceberg table in its place
- renameAndBackupSourceTable();
-
- StagedSparkTable stagedTable = null;
- Table icebergTable;
- boolean threw = true;
- try {
- LOG.info("Staging a new Iceberg table {}", destTableIdent());
- stagedTable = stageDestTable();
- icebergTable = stagedTable.table();
-
- LOG.info("Ensuring {} has a valid name mapping", destTableIdent());
- ensureNameMappingPresent(icebergTable);
-
- String stagingLocation = getMetadataLocation(icebergTable);
- Some<String> backupNamespace = Some.apply(backupIdent.namespace()[0]);
- TableIdentifier v1BackupIdentifier = new TableIdentifier(backupIdent.name(), backupNamespace);
- LOG.info("Generating Iceberg metadata for {} in {}", destTableIdent(), stagingLocation);
- SparkTableUtil.importSparkTable(spark(), v1BackupIdentifier, icebergTable, stagingLocation);
-
- LOG.info("Committing staged changes to {}", destTableIdent());
- stagedTable.commitStagedChanges();
- threw = false;
- } finally {
- if (threw) {
- LOG.error("Error when attempting perform migration changes, aborting table creation and restoring backup.");
-
- restoreSourceTable();
-
- if (stagedTable != null) {
- try {
- stagedTable.abortStagedChanges();
- } catch (Exception abortException) {
- LOG.error("Cannot abort staged changes", abortException);
- }
- }
- }
- }
-
- Snapshot snapshot = icebergTable.currentSnapshot();
- long numMigratedFiles = Long.parseLong(snapshot.summary().get(SnapshotSummary.TOTAL_DATA_FILES_PROP));
- LOG.info("Successfully loaded Iceberg metadata for {} files", numMigratedFiles);
- return numMigratedFiles;
+ this.delegate = new BaseMigrateTableSparkAction(spark, sourceCatalog, sourceTableName);
}
@Override
- public Long execute() {
- JobGroupInfo info = new JobGroupInfo("MIGRATE", "MIGRATE", false);
- return withJobGroupInfo(info, this::doExecute);
+ public CreateAction withProperties(Map<String, String> properties) {
+ delegate.tableProperties(properties);
+ return this;
}
@Override
- protected Map<String, String> targetTableProps() {
- Map<String, String> properties = Maps.newHashMap();
-
- // copy over relevant source table props
- properties.putAll(JavaConverters.mapAsJavaMapConverter(v1SourceTable().properties()).asJava());
- EXCLUDED_PROPERTIES.forEach(properties::remove);
-
- // set default and user-provided props
- properties.put(TableCatalog.PROP_PROVIDER, "iceberg");
- properties.putAll(additionalProperties());
-
- // make sure we mark this table as migrated
- properties.put("migrated", "true");
-
- // inherit the source table location
- properties.putIfAbsent(LOCATION, sourceTableLocation());
-
- return properties;
+ public CreateAction withProperty(String key, String value) {
+ delegate.tableProperty(key, value);
+ return this;
}
@Override
- protected TableCatalog checkSourceCatalog(CatalogPlugin catalog) {
- // Currently the Import code relies on being able to look up the table in the session code
- Preconditions.checkArgument(catalog instanceof SparkSessionCatalog,
- "Cannot migrate a table from a non-Iceberg Spark Session Catalog. Found %s of class %s as the source catalog.",
- catalog.name(), catalog.getClass().getName());
-
- return (TableCatalog) catalog;
- }
-
- private void renameAndBackupSourceTable() {
- try {
- LOG.info("Renaming {} as {} for backup", sourceTableIdent(), backupIdent);
- destCatalog().renameTable(sourceTableIdent(), backupIdent);
-
- } catch (org.apache.spark.sql.catalyst.analysis.NoSuchTableException e) {
- throw new NoSuchTableException("Cannot find source table %s", sourceTableIdent());
-
- } catch (org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException e) {
- throw new AlreadyExistsException(
- "Cannot rename %s as %s for backup. The backup table already exists.",
- sourceTableIdent(), backupIdent);
- }
- }
-
- private void restoreSourceTable() {
- try {
- LOG.info("Restoring {} from {}", sourceTableIdent(), backupIdent);
- destCatalog().renameTable(backupIdent, sourceTableIdent());
-
- } catch (org.apache.spark.sql.catalyst.analysis.NoSuchTableException e) {
- LOG.error("Cannot restore the original table, the backup table {} cannot be found", backupIdent, e);
-
- } catch (org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException e) {
- LOG.error("Cannot restore the original table, a table with the original name exists. " +
- "Use the backup table {} to restore the original table manually.", backupIdent, e);
- }
+ public Long execute() {
+ MigrateTable.Result result = delegate.execute();
+ return result.migratedDataFilesCount();
}
}
diff --git a/spark3/src/main/java/org/apache/iceberg/actions/Spark3SnapshotAction.java b/spark3/src/main/java/org/apache/iceberg/actions/Spark3SnapshotAction.java
index c4ed14d..9948350 100644
--- a/spark3/src/main/java/org/apache/iceberg/actions/Spark3SnapshotAction.java
+++ b/spark3/src/main/java/org/apache/iceberg/actions/Spark3SnapshotAction.java
@@ -20,132 +20,51 @@
package org.apache.iceberg.actions;
import java.util.Map;
-import org.apache.iceberg.Snapshot;
-import org.apache.iceberg.SnapshotSummary;
-import org.apache.iceberg.Table;
-import org.apache.iceberg.TableProperties;
-import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
-import org.apache.iceberg.relocated.com.google.common.collect.Maps;
-import org.apache.iceberg.spark.JobGroupInfo;
-import org.apache.iceberg.spark.SparkTableUtil;
-import org.apache.iceberg.spark.source.StagedSparkTable;
+import org.apache.iceberg.spark.actions.BaseSnapshotTableSparkAction;
import org.apache.spark.sql.SparkSession;
-import org.apache.spark.sql.catalyst.TableIdentifier;
import org.apache.spark.sql.connector.catalog.CatalogPlugin;
import org.apache.spark.sql.connector.catalog.Identifier;
-import org.apache.spark.sql.connector.catalog.TableCatalog;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.collection.JavaConverters;
/**
* Creates a new Iceberg table based on a source Spark table. The new Iceberg table will
* have a different data and metadata directory allowing it to exist independently of the
* source table.
+ *
+ * @deprecated since 0.12.0, will be removed in 0.13.0; use {@link SnapshotTable} instead.
*/
-public class Spark3SnapshotAction extends Spark3CreateAction implements SnapshotAction {
- private static final Logger LOG = LoggerFactory.getLogger(Spark3SnapshotAction.class);
+@Deprecated
+public class Spark3SnapshotAction implements SnapshotAction {
- private String destTableLocation = null;
+ private final SnapshotTable delegate;
public Spark3SnapshotAction(SparkSession spark, CatalogPlugin sourceCatalog,
Identifier sourceTableIdent, CatalogPlugin destCatalog,
Identifier destTableIdent) {
- super(spark, sourceCatalog, sourceTableIdent, destCatalog, destTableIdent);
- }
-
- private Long doExecute() {
- LOG.info("Staging a new Iceberg table {} as a snapshot of {}", destTableIdent(), sourceTableIdent());
- StagedSparkTable stagedTable = stageDestTable();
- Table icebergTable = stagedTable.table();
- // TODO Check table location here against source location
-
- boolean threw = true;
- try {
- LOG.info("Ensuring {} has a valid name mapping", destTableIdent());
- ensureNameMappingPresent(icebergTable);
-
- String stagingLocation = getMetadataLocation(icebergTable);
- TableIdentifier v1TableIdentifier = v1SourceTable().identifier();
- LOG.info("Generating Iceberg metadata for {} in {}", destTableIdent(), stagingLocation);
- SparkTableUtil.importSparkTable(spark(), v1TableIdentifier, icebergTable, stagingLocation);
-
- LOG.info("Committing staged changes to {}", destTableIdent());
- stagedTable.commitStagedChanges();
- threw = false;
- } finally {
- if (threw) {
- LOG.error("Error when attempting to commit snapshot changes, rolling back");
- try {
- stagedTable.abortStagedChanges();
- } catch (Exception abortException) {
- LOG.error("Cannot abort staged changes", abortException);
- }
- }
- }
-
- Snapshot snapshot = icebergTable.currentSnapshot();
- long numMigratedFiles = Long.parseLong(snapshot.summary().get(SnapshotSummary.TOTAL_DATA_FILES_PROP));
- LOG.info("Successfully loaded Iceberg metadata for {} files to {}", numMigratedFiles, destTableIdent());
- return numMigratedFiles;
+ delegate = new BaseSnapshotTableSparkAction(spark, sourceCatalog, sourceTableIdent, destCatalog, destTableIdent);
}
@Override
- public Long execute() {
- JobGroupInfo info = new JobGroupInfo("SNAPSHOT", "SNAPSHOT", false);
- return withJobGroupInfo(info, this::doExecute);
+ public SnapshotAction withLocation(String location) {
+ delegate.tableLocation(location);
+ return this;
}
@Override
- protected Map<String, String> targetTableProps() {
- Map<String, String> properties = Maps.newHashMap();
-
- // copy over relevant source table props
- properties.putAll(JavaConverters.mapAsJavaMapConverter(v1SourceTable().properties()).asJava());
- EXCLUDED_PROPERTIES.forEach(properties::remove);
-
- // Remove any possible location properties from origin properties
- properties.remove(LOCATION);
- properties.remove(TableProperties.WRITE_METADATA_LOCATION);
- properties.remove(TableProperties.WRITE_NEW_DATA_LOCATION);
-
- // set default and user-provided props
- properties.put(TableCatalog.PROP_PROVIDER, "iceberg");
- properties.putAll(additionalProperties());
-
- // make sure we mark this table as a snapshot table
- properties.put(TableProperties.GC_ENABLED, "false");
- properties.put("snapshot", "true");
-
- // Don't use the default location for the destination table if an alternate has be set
- if (destTableLocation != null) {
- properties.put(LOCATION, destTableLocation);
- }
-
- return properties;
+ public SnapshotAction withProperties(Map<String, String> properties) {
+ delegate.tableProperties(properties);
+ return this;
}
@Override
- protected TableCatalog checkSourceCatalog(CatalogPlugin catalog) {
- // Currently the Import code relies on being able to look up the table in the session code
- Preconditions.checkArgument(catalog.name().equals("spark_catalog"),
- "Cannot snapshot a table that isn't in spark_catalog, the session catalog. Found source catalog %s",
- catalog.name());
-
- Preconditions.checkArgument(catalog instanceof TableCatalog,
- "Cannot snapshot a table from a non-table catalog %s. Catalog has class of %s.", catalog.name(),
- catalog.getClass().toString());
-
- return (TableCatalog) catalog;
+ public SnapshotAction withProperty(String key, String value) {
+ delegate.tableProperty(key, value);
+ return this;
}
@Override
- public SnapshotAction withLocation(String location) {
- Preconditions.checkArgument(!sourceTableLocation().equals(location),
- "Cannot create snapshot where destination location is the same as the source location." +
- " This would cause a mixing of original table created and snapshot created files.");
- this.destTableLocation = location;
- return this;
+ public Long execute() {
+ SnapshotTable.Result result = delegate.execute();
+ return result.importedDataFilesCount();
}
}
diff --git a/spark3/src/main/java/org/apache/iceberg/actions/Spark3MigrateAction.java b/spark3/src/main/java/org/apache/iceberg/spark/actions/BaseMigrateTableSparkAction.java
similarity index 67%
copy from spark3/src/main/java/org/apache/iceberg/actions/Spark3MigrateAction.java
copy to spark3/src/main/java/org/apache/iceberg/spark/actions/BaseMigrateTableSparkAction.java
index e2df5ee..021c5e5 100644
--- a/spark3/src/main/java/org/apache/iceberg/actions/Spark3MigrateAction.java
+++ b/spark3/src/main/java/org/apache/iceberg/spark/actions/BaseMigrateTableSparkAction.java
@@ -17,12 +17,14 @@
* under the License.
*/
-package org.apache.iceberg.actions;
+package org.apache.iceberg.spark.actions;
import java.util.Map;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.SnapshotSummary;
import org.apache.iceberg.Table;
+import org.apache.iceberg.actions.BaseMigrateTableActionResult;
+import org.apache.iceberg.actions.MigrateTable;
import org.apache.iceberg.exceptions.AlreadyExistsException;
import org.apache.iceberg.exceptions.NoSuchTableException;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
@@ -35,6 +37,7 @@ import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.TableIdentifier;
import org.apache.spark.sql.connector.catalog.CatalogPlugin;
import org.apache.spark.sql.connector.catalog.Identifier;
+import org.apache.spark.sql.connector.catalog.StagingTableCatalog;
import org.apache.spark.sql.connector.catalog.TableCatalog;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -42,27 +45,67 @@ import scala.Some;
import scala.collection.JavaConverters;
/**
- * Takes a Spark table in the sourceCatalog and attempts to transform it into an Iceberg
- * Table in the same location with the same identifier. Once complete the identifier which
- * previously referred to a non-iceberg table will refer to the newly migrated iceberg
+ * Takes a Spark table in the source catalog and attempts to transform it into an Iceberg
+ * table in the same location with the same identifier. Once complete the identifier which
+ * previously referred to a non-Iceberg table will refer to the newly migrated Iceberg
* table.
*/
-public class Spark3MigrateAction extends Spark3CreateAction {
- private static final Logger LOG = LoggerFactory.getLogger(Spark3MigrateAction.class);
+public class BaseMigrateTableSparkAction
+ extends BaseTableCreationSparkAction<MigrateTable, MigrateTable.Result>
+ implements MigrateTable {
+
+ private static final Logger LOG = LoggerFactory.getLogger(BaseMigrateTableSparkAction.class);
private static final String BACKUP_SUFFIX = "_BACKUP_";
+ private final StagingTableCatalog destCatalog;
+ private final Identifier destTableIdent;
private final Identifier backupIdent;
- public Spark3MigrateAction(SparkSession spark, CatalogPlugin sourceCatalog, Identifier sourceTableName) {
- super(spark, sourceCatalog, sourceTableName, sourceCatalog, sourceTableName);
- String backupName = sourceTableIdent().name() + BACKUP_SUFFIX;
- this.backupIdent = Identifier.of(sourceTableIdent().namespace(), backupName);
+ public BaseMigrateTableSparkAction(SparkSession spark, CatalogPlugin sourceCatalog, Identifier sourceTableIdent) {
+ super(spark, sourceCatalog, sourceTableIdent);
+ this.destCatalog = checkDestinationCatalog(sourceCatalog);
+ this.destTableIdent = sourceTableIdent;
+ String backupName = sourceTableIdent.name() + BACKUP_SUFFIX;
+ this.backupIdent = Identifier.of(sourceTableIdent.namespace(), backupName);
+ }
+
+ @Override
+ protected MigrateTable self() {
+ return this;
+ }
+
+ @Override
+ protected StagingTableCatalog destCatalog() {
+ return destCatalog;
+ }
+
+ @Override
+ protected Identifier destTableIdent() {
+ return destTableIdent;
+ }
+
+ @Override
+ public MigrateTable tableProperties(Map<String, String> properties) {
+ setProperties(properties);
+ return this;
+ }
+
+ @Override
+ public MigrateTable tableProperty(String property, String value) {
+ setProperty(property, value);
+ return this;
+ }
+
+ @Override
+ public MigrateTable.Result execute() {
+ JobGroupInfo info = newJobGroupInfo("MIGRATE-TABLE", "MIGRATE-TABLE");
+ return withJobGroupInfo(info, this::doExecute);
}
- private Long doExecute() {
+ private MigrateTable.Result doExecute() {
LOG.info("Starting the migration of {} to Iceberg", sourceTableIdent());
- // Move source table to a new name, halting all modifications and allowing us to stage
+ // move the source table to a new name, halting all modifications and allowing us to stage
// the creation of a new Iceberg table in its place
renameAndBackupSourceTable();
@@ -77,18 +120,18 @@ public class Spark3MigrateAction extends Spark3CreateAction {
LOG.info("Ensuring {} has a valid name mapping", destTableIdent());
ensureNameMappingPresent(icebergTable);
- String stagingLocation = getMetadataLocation(icebergTable);
Some<String> backupNamespace = Some.apply(backupIdent.namespace()[0]);
- TableIdentifier v1BackupIdentifier = new TableIdentifier(backupIdent.name(), backupNamespace);
+ TableIdentifier v1BackupIdent = new TableIdentifier(backupIdent.name(), backupNamespace);
+ String stagingLocation = getMetadataLocation(icebergTable);
LOG.info("Generating Iceberg metadata for {} in {}", destTableIdent(), stagingLocation);
- SparkTableUtil.importSparkTable(spark(), v1BackupIdentifier, icebergTable, stagingLocation);
+ SparkTableUtil.importSparkTable(spark(), v1BackupIdent, icebergTable, stagingLocation);
LOG.info("Committing staged changes to {}", destTableIdent());
stagedTable.commitStagedChanges();
threw = false;
} finally {
if (threw) {
- LOG.error("Error when attempting perform migration changes, aborting table creation and restoring backup.");
+ LOG.error("Failed to perform the migration, aborting table creation and restoring the original table");
restoreSourceTable();
@@ -103,19 +146,13 @@ public class Spark3MigrateAction extends Spark3CreateAction {
}
Snapshot snapshot = icebergTable.currentSnapshot();
- long numMigratedFiles = Long.parseLong(snapshot.summary().get(SnapshotSummary.TOTAL_DATA_FILES_PROP));
- LOG.info("Successfully loaded Iceberg metadata for {} files", numMigratedFiles);
- return numMigratedFiles;
- }
-
- @Override
- public Long execute() {
- JobGroupInfo info = new JobGroupInfo("MIGRATE", "MIGRATE", false);
- return withJobGroupInfo(info, this::doExecute);
+ long migratedDataFilesCount = Long.parseLong(snapshot.summary().get(SnapshotSummary.TOTAL_DATA_FILES_PROP));
+ LOG.info("Successfully loaded Iceberg metadata for {} files to {}", migratedDataFilesCount, destTableIdent());
+ return new BaseMigrateTableActionResult(migratedDataFilesCount);
}
@Override
- protected Map<String, String> targetTableProps() {
+ protected Map<String, String> destTableProps() {
Map<String, String> properties = Maps.newHashMap();
// copy over relevant source table props
@@ -137,7 +174,7 @@ public class Spark3MigrateAction extends Spark3CreateAction {
@Override
protected TableCatalog checkSourceCatalog(CatalogPlugin catalog) {
- // Currently the Import code relies on being able to look up the table in the session code
+ // currently the import code relies on being able to look up the table in the session catalog
Preconditions.checkArgument(catalog instanceof SparkSessionCatalog,
"Cannot migrate a table from a non-Iceberg Spark Session Catalog. Found %s of class %s as the source catalog.",
catalog.name(), catalog.getClass().getName());
diff --git a/spark3/src/main/java/org/apache/iceberg/actions/Spark3SnapshotAction.java b/spark3/src/main/java/org/apache/iceberg/spark/actions/BaseSnapshotTableSparkAction.java
similarity index 50%
copy from spark3/src/main/java/org/apache/iceberg/actions/Spark3SnapshotAction.java
copy to spark3/src/main/java/org/apache/iceberg/spark/actions/BaseSnapshotTableSparkAction.java
index c4ed14d..4d4f37b 100644
--- a/spark3/src/main/java/org/apache/iceberg/actions/Spark3SnapshotAction.java
+++ b/spark3/src/main/java/org/apache/iceberg/spark/actions/BaseSnapshotTableSparkAction.java
@@ -17,22 +17,27 @@
* under the License.
*/
-package org.apache.iceberg.actions;
+package org.apache.iceberg.spark.actions;
import java.util.Map;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.SnapshotSummary;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.actions.BaseSnapshotTableActionResult;
+import org.apache.iceberg.actions.SnapshotTable;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.spark.JobGroupInfo;
+import org.apache.iceberg.spark.Spark3Util;
+import org.apache.iceberg.spark.Spark3Util.CatalogAndIdentifier;
import org.apache.iceberg.spark.SparkTableUtil;
import org.apache.iceberg.spark.source.StagedSparkTable;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.TableIdentifier;
import org.apache.spark.sql.connector.catalog.CatalogPlugin;
import org.apache.spark.sql.connector.catalog.Identifier;
+import org.apache.spark.sql.connector.catalog.StagingTableCatalog;
import org.apache.spark.sql.connector.catalog.TableCatalog;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -43,39 +48,98 @@ import scala.collection.JavaConverters;
* have a different data and metadata directory allowing it to exist independently of the
* source table.
*/
-public class Spark3SnapshotAction extends Spark3CreateAction implements SnapshotAction {
- private static final Logger LOG = LoggerFactory.getLogger(Spark3SnapshotAction.class);
+public class BaseSnapshotTableSparkAction
+ extends BaseTableCreationSparkAction<SnapshotTable, SnapshotTable.Result>
+ implements SnapshotTable {
+ private static final Logger LOG = LoggerFactory.getLogger(BaseSnapshotTableSparkAction.class);
+
+ private StagingTableCatalog destCatalog;
+ private Identifier destTableIdent;
private String destTableLocation = null;
- public Spark3SnapshotAction(SparkSession spark, CatalogPlugin sourceCatalog,
- Identifier sourceTableIdent, CatalogPlugin destCatalog,
- Identifier destTableIdent) {
- super(spark, sourceCatalog, sourceTableIdent, destCatalog, destTableIdent);
+ BaseSnapshotTableSparkAction(SparkSession spark, CatalogPlugin sourceCatalog, Identifier sourceTableIdent) {
+ super(spark, sourceCatalog, sourceTableIdent);
+ }
+
+ // used by the old constructor
+ public BaseSnapshotTableSparkAction(SparkSession spark, CatalogPlugin sourceCatalog, Identifier sourceTableIdent,
+ CatalogPlugin destCatalog, Identifier destTableIdent) {
+ super(spark, sourceCatalog, sourceTableIdent);
+ this.destCatalog = checkDestinationCatalog(destCatalog);
+ this.destTableIdent = destTableIdent;
+ }
+
+ @Override
+ protected SnapshotTable self() {
+ return this;
+ }
+
+ @Override
+ protected StagingTableCatalog destCatalog() {
+ return destCatalog;
}
- private Long doExecute() {
+ @Override
+ protected Identifier destTableIdent() {
+ return destTableIdent;
+ }
+
+ @Override
+ public SnapshotTable as(String ident) {
+ String ctx = "snapshot destination";
+ CatalogPlugin defaultCatalog = spark().sessionState().catalogManager().currentCatalog();
+ CatalogAndIdentifier catalogAndIdent = Spark3Util.catalogAndIdentifier(ctx, spark(), ident, defaultCatalog);
+ this.destCatalog = checkDestinationCatalog(catalogAndIdent.catalog());
+ this.destTableIdent = catalogAndIdent.identifier();
+ return this;
+ }
+
+ @Override
+ public SnapshotTable tableProperties(Map<String, String> properties) {
+ setProperties(properties);
+ return this;
+ }
+
+ @Override
+ public SnapshotTable tableProperty(String property, String value) {
+ setProperty(property, value);
+ return this;
+ }
+
+ @Override
+ public SnapshotTable.Result execute() {
+ JobGroupInfo info = newJobGroupInfo("SNAPSHOT-TABLE", "SNAPSHOT-TABLE");
+ return withJobGroupInfo(info, this::doExecute);
+ }
+
+ private SnapshotTable.Result doExecute() {
+ Preconditions.checkArgument(destCatalog() != null && destTableIdent() != null,
+ "The destination catalog and identifier cannot be null. " +
+ "Make sure to configure the action with a valid destination table identifier via the `as` method.");
+
LOG.info("Staging a new Iceberg table {} as a snapshot of {}", destTableIdent(), sourceTableIdent());
StagedSparkTable stagedTable = stageDestTable();
Table icebergTable = stagedTable.table();
- // TODO Check table location here against source location
+
+ // TODO: Check the dest table location does not overlap with the source table location
boolean threw = true;
try {
LOG.info("Ensuring {} has a valid name mapping", destTableIdent());
ensureNameMappingPresent(icebergTable);
+ TableIdentifier v1TableIdent = v1SourceTable().identifier();
String stagingLocation = getMetadataLocation(icebergTable);
- TableIdentifier v1TableIdentifier = v1SourceTable().identifier();
LOG.info("Generating Iceberg metadata for {} in {}", destTableIdent(), stagingLocation);
- SparkTableUtil.importSparkTable(spark(), v1TableIdentifier, icebergTable, stagingLocation);
+ SparkTableUtil.importSparkTable(spark(), v1TableIdent, icebergTable, stagingLocation);
LOG.info("Committing staged changes to {}", destTableIdent());
stagedTable.commitStagedChanges();
threw = false;
} finally {
if (threw) {
- LOG.error("Error when attempting to commit snapshot changes, rolling back");
+ LOG.error("Error when populating the staged table with metadata, aborting changes");
try {
stagedTable.abortStagedChanges();
@@ -86,26 +150,20 @@ public class Spark3SnapshotAction extends Spark3CreateAction implements Snapshot
}
Snapshot snapshot = icebergTable.currentSnapshot();
- long numMigratedFiles = Long.parseLong(snapshot.summary().get(SnapshotSummary.TOTAL_DATA_FILES_PROP));
- LOG.info("Successfully loaded Iceberg metadata for {} files to {}", numMigratedFiles, destTableIdent());
- return numMigratedFiles;
- }
-
- @Override
- public Long execute() {
- JobGroupInfo info = new JobGroupInfo("SNAPSHOT", "SNAPSHOT", false);
- return withJobGroupInfo(info, this::doExecute);
+ long importedDataFilesCount = Long.parseLong(snapshot.summary().get(SnapshotSummary.TOTAL_DATA_FILES_PROP));
+ LOG.info("Successfully loaded Iceberg metadata for {} files to {}", importedDataFilesCount, destTableIdent());
+ return new BaseSnapshotTableActionResult(importedDataFilesCount);
}
@Override
- protected Map<String, String> targetTableProps() {
+ protected Map<String, String> destTableProps() {
Map<String, String> properties = Maps.newHashMap();
// copy over relevant source table props
properties.putAll(JavaConverters.mapAsJavaMapConverter(v1SourceTable().properties()).asJava());
EXCLUDED_PROPERTIES.forEach(properties::remove);
- // Remove any possible location properties from origin properties
+ // remove any possible location properties from origin properties
properties.remove(LOCATION);
properties.remove(TableProperties.WRITE_METADATA_LOCATION);
properties.remove(TableProperties.WRITE_NEW_DATA_LOCATION);
@@ -118,7 +176,7 @@ public class Spark3SnapshotAction extends Spark3CreateAction implements Snapshot
properties.put(TableProperties.GC_ENABLED, "false");
properties.put("snapshot", "true");
- // Don't use the default location for the destination table if an alternate has be set
+ // set the destination table location if provided
if (destTableLocation != null) {
properties.put(LOCATION, destTableLocation);
}
@@ -128,23 +186,23 @@ public class Spark3SnapshotAction extends Spark3CreateAction implements Snapshot
@Override
protected TableCatalog checkSourceCatalog(CatalogPlugin catalog) {
- // Currently the Import code relies on being able to look up the table in the session code
- Preconditions.checkArgument(catalog.name().equals("spark_catalog"),
- "Cannot snapshot a table that isn't in spark_catalog, the session catalog. Found source catalog %s",
- catalog.name());
+ // currently the import code relies on being able to look up the table in the session catalog
+ Preconditions.checkArgument(catalog.name().equalsIgnoreCase("spark_catalog"),
+ "Cannot snapshot a table that isn't in the session catalog (i.e. spark_catalog). " +
+ "Found source catalog: %s.", catalog.name());
Preconditions.checkArgument(catalog instanceof TableCatalog,
- "Cannot snapshot a table from a non-table catalog %s. Catalog has class of %s.", catalog.name(),
- catalog.getClass().toString());
+ "Cannot snapshot as catalog %s of class %s in not a table catalog",
+ catalog.name(), catalog.getClass().getName());
return (TableCatalog) catalog;
}
@Override
- public SnapshotAction withLocation(String location) {
+ public SnapshotTable tableLocation(String location) {
Preconditions.checkArgument(!sourceTableLocation().equals(location),
- "Cannot create snapshot where destination location is the same as the source location." +
- " This would cause a mixing of original table created and snapshot created files.");
+ "The snapshot table location cannot be same as the source table location. " +
+ "This would mix snapshot table files with original table files.");
this.destTableLocation = location;
return this;
}
diff --git a/spark3/src/main/java/org/apache/iceberg/actions/Spark3CreateAction.java b/spark3/src/main/java/org/apache/iceberg/spark/actions/BaseTableCreationSparkAction.java
similarity index 74%
rename from spark3/src/main/java/org/apache/iceberg/actions/Spark3CreateAction.java
rename to spark3/src/main/java/org/apache/iceberg/spark/actions/BaseTableCreationSparkAction.java
index a87776d..6eadece 100644
--- a/spark3/src/main/java/org/apache/iceberg/actions/Spark3CreateAction.java
+++ b/spark3/src/main/java/org/apache/iceberg/spark/actions/BaseTableCreationSparkAction.java
@@ -17,13 +17,12 @@
* under the License.
*/
-package org.apache.iceberg.actions;
+package org.apache.iceberg.spark.actions;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
-import java.util.function.Supplier;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.exceptions.AlreadyExistsException;
@@ -36,12 +35,9 @@ import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
-import org.apache.iceberg.spark.JobGroupInfo;
-import org.apache.iceberg.spark.JobGroupUtils;
import org.apache.iceberg.spark.SparkCatalog;
import org.apache.iceberg.spark.SparkSessionCatalog;
import org.apache.iceberg.spark.source.StagedSparkTable;
-import org.apache.spark.SparkContext;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.catalog.CatalogTable;
import org.apache.spark.sql.catalyst.catalog.CatalogUtils;
@@ -53,15 +49,13 @@ import org.apache.spark.sql.connector.catalog.V1Table;
import org.apache.spark.sql.connector.expressions.Transform;
import org.apache.spark.sql.types.StructType;
-abstract class Spark3CreateAction implements CreateAction {
+abstract class BaseTableCreationSparkAction<ThisT, R> extends BaseSparkAction<ThisT, R> {
private static final Set<String> ALLOWED_SOURCES = ImmutableSet.of("parquet", "avro", "orc", "hive");
protected static final String LOCATION = "location";
protected static final String ICEBERG_METADATA_FOLDER = "metadata";
protected static final List<String> EXCLUDED_PROPERTIES =
ImmutableList.of("path", "transient_lastDdlTime", "serialization.format");
- private final SparkSession spark;
-
// Source Fields
private final V1Table sourceTable;
private final CatalogTable sourceCatalogTable;
@@ -69,21 +63,14 @@ abstract class Spark3CreateAction implements CreateAction {
private final TableCatalog sourceCatalog;
private final Identifier sourceTableIdent;
- // Destination Fields
- private final StagingTableCatalog destCatalog;
- private final Identifier destTableIdent;
-
// Optional Parameters for destination
private final Map<String, String> additionalProperties = Maps.newHashMap();
- Spark3CreateAction(SparkSession spark, CatalogPlugin sourceCatalog, Identifier sourceTableIdent,
- CatalogPlugin destCatalog, Identifier destTableIdent) {
+ BaseTableCreationSparkAction(SparkSession spark, CatalogPlugin sourceCatalog, Identifier sourceTableIdent) {
+ super(spark);
- this.spark = spark;
this.sourceCatalog = checkSourceCatalog(sourceCatalog);
this.sourceTableIdent = sourceTableIdent;
- this.destCatalog = checkDestinationCatalog(destCatalog);
- this.destTableIdent = destTableIdent;
try {
this.sourceTable = (V1Table) this.sourceCatalog.loadTable(sourceTableIdent);
@@ -99,21 +86,13 @@ abstract class Spark3CreateAction implements CreateAction {
this.sourceTableLocation = CatalogUtils.URIToString(sourceCatalogTable.storage().locationUri().get());
}
- @Override
- public CreateAction withProperties(Map<String, String> properties) {
- this.additionalProperties.putAll(properties);
- return this;
- }
+ protected abstract TableCatalog checkSourceCatalog(CatalogPlugin catalog);
- @Override
- public CreateAction withProperty(String key, String value) {
- this.additionalProperties.put(key, value);
- return this;
- }
+ protected abstract StagingTableCatalog destCatalog();
- protected SparkSession spark() {
- return spark;
- }
+ protected abstract Identifier destTableIdent();
+
+ protected abstract Map<String, String> destTableProps();
protected String sourceTableLocation() {
return sourceTableLocation;
@@ -131,12 +110,12 @@ abstract class Spark3CreateAction implements CreateAction {
return sourceTableIdent;
}
- protected StagingTableCatalog destCatalog() {
- return destCatalog;
+ protected void setProperties(Map<String, String> properties) {
+ additionalProperties.putAll(properties);
}
- protected Identifier destTableIdent() {
- return destTableIdent;
+ protected void setProperty(String key, String value) {
+ additionalProperties.put(key, value);
}
protected Map<String, String> additionalProperties() {
@@ -151,7 +130,7 @@ abstract class Spark3CreateAction implements CreateAction {
"Cannot create an Iceberg table from a source without an explicit location");
}
- private StagingTableCatalog checkDestinationCatalog(CatalogPlugin catalog) {
+ protected StagingTableCatalog checkDestinationCatalog(CatalogPlugin catalog) {
Preconditions.checkArgument(catalog instanceof SparkSessionCatalog || catalog instanceof SparkCatalog,
"Cannot create Iceberg table in non-Iceberg Catalog. " +
"Catalog '%s' was of class '%s' but '%s' or '%s' are required",
@@ -163,15 +142,14 @@ abstract class Spark3CreateAction implements CreateAction {
protected StagedSparkTable stageDestTable() {
try {
- Map<String, String> props = targetTableProps();
+ Map<String, String> props = destTableProps();
StructType schema = sourceTable.schema();
Transform[] partitioning = sourceTable.partitioning();
- return (StagedSparkTable) destCatalog.stageCreate(destTableIdent, schema, partitioning, props);
+ return (StagedSparkTable) destCatalog().stageCreate(destTableIdent(), schema, partitioning, props);
} catch (org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException e) {
- throw new NoSuchNamespaceException("Cannot create a table '%s' because the namespace does not exist",
- destTableIdent);
+ throw new NoSuchNamespaceException("Cannot create table %s as the namespace does not exist", destTableIdent());
} catch (org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException e) {
- throw new AlreadyExistsException("Cannot create table '%s' because it already exists", destTableIdent);
+ throw new AlreadyExistsException("Cannot create table %s as it already exists", destTableIdent());
}
}
@@ -187,20 +165,4 @@ abstract class Spark3CreateAction implements CreateAction {
return table.properties().getOrDefault(TableProperties.WRITE_METADATA_LOCATION,
table.location() + "/" + ICEBERG_METADATA_FOLDER);
}
-
- protected abstract Map<String, String> targetTableProps();
-
- protected abstract TableCatalog checkSourceCatalog(CatalogPlugin catalog);
-
- protected <T> T withJobGroupInfo(JobGroupInfo info, Supplier<T> supplier) {
- SparkContext context = spark().sparkContext();
- JobGroupInfo previousInfo = JobGroupUtils.getJobGroupInfo(context);
- try {
- JobGroupUtils.setJobGroupInfo(context, info);
- return supplier.get();
- } finally {
- JobGroupUtils.setJobGroupInfo(context, previousInfo);
- }
- }
-
}