You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by sz...@apache.org on 2022/10/18 18:28:56 UTC
[iceberg] branch master updated: Spark: Add option to remove backup table after successful migrate action (#5622)
This is an automated email from the ASF dual-hosted git repository.
szehon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 754cff910d Spark: Add option to remove backup table after successful migrate action (#5622)
754cff910d is described below
commit 754cff910d8e5bb45bb36113d3d8730a5bc9203c
Author: Rishi <sr...@gmail.com>
AuthorDate: Tue Oct 18 11:28:50 2022 -0700
Spark: Add option to remove backup table after successful migrate action (#5622)
---
.palantir/revapi.yml | 3 +++
.../org/apache/iceberg/actions/MigrateTable.java | 7 +++++++
.../spark/extensions/TestMigrateTableProcedure.java | 16 ++++++++++++++++
.../spark/actions/BaseMigrateTableSparkAction.java | 19 +++++++++++++++++++
.../spark/procedures/MigrateTableProcedure.java | 17 ++++++++++++++---
.../spark/extensions/TestMigrateTableProcedure.java | 18 +++++++++++++++++-
.../spark/actions/BaseMigrateTableSparkAction.java | 19 +++++++++++++++++++
.../spark/procedures/MigrateTableProcedure.java | 17 ++++++++++++++---
.../spark/extensions/TestMigrateTableProcedure.java | 18 +++++++++++++++++-
.../spark/actions/MigrateTableSparkAction.java | 19 +++++++++++++++++++
.../spark/procedures/MigrateTableProcedure.java | 18 +++++++++++++++---
.../spark/extensions/TestMigrateTableProcedure.java | 20 ++++++++++++++++++--
.../spark/actions/MigrateTableSparkAction.java | 19 +++++++++++++++++++
.../spark/procedures/MigrateTableProcedure.java | 18 +++++++++++++++---
14 files changed, 212 insertions(+), 16 deletions(-)
diff --git a/.palantir/revapi.yml b/.palantir/revapi.yml
index ffbcbeee15..2edd123425 100644
--- a/.palantir/revapi.yml
+++ b/.palantir/revapi.yml
@@ -13,6 +13,9 @@ acceptedBreaks:
- code: "java.method.addedToInterface"
new: "method java.lang.String org.apache.iceberg.expressions.Reference<T>::name()"
justification: "All subclasses implement name"
+ - code: "java.method.addedToInterface"
+ new: "method org.apache.iceberg.actions.MigrateTable org.apache.iceberg.actions.MigrateTable::dropBackup()"
+ justification: "Adding new functionality to allow for dropping backup table"
- code: "java.method.addedToInterface"
new: "method java.util.List<org.apache.iceberg.StatisticsFile> org.apache.iceberg.Table::statisticsFiles()"
justification: "new API method"
diff --git a/api/src/main/java/org/apache/iceberg/actions/MigrateTable.java b/api/src/main/java/org/apache/iceberg/actions/MigrateTable.java
index e645fad217..36da1d6607 100644
--- a/api/src/main/java/org/apache/iceberg/actions/MigrateTable.java
+++ b/api/src/main/java/org/apache/iceberg/actions/MigrateTable.java
@@ -41,6 +41,13 @@ public interface MigrateTable extends Action<MigrateTable, MigrateTable.Result>
*/
MigrateTable tableProperty(String name, String value);
+ /**
+ * Drops the backup of the original table after a successful migration
+ *
+ * @return this for method chaining
+ */
+ MigrateTable dropBackup();
+
/** The action result that contains a summary of the execution. */
interface Result {
/** Returns the number of migrated data files. */
diff --git a/spark/v3.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMigrateTableProcedure.java b/spark/v3.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMigrateTableProcedure.java
index f9c150a3b1..bfeeee8066 100644
--- a/spark/v3.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMigrateTableProcedure.java
+++ b/spark/v3.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMigrateTableProcedure.java
@@ -106,6 +106,22 @@ public class TestMigrateTableProcedure extends SparkExtensionsTestBase {
sql("DROP TABLE %s", tableName + "_BACKUP_");
}
+ @Test
+ public void testMigrateWithDropBackup() throws IOException {
+ Assume.assumeTrue(catalogName.equals("spark_catalog"));
+ String location = temp.newFolder().toString();
+ sql(
+ "CREATE TABLE %s (id bigint NOT NULL, data string) USING parquet LOCATION '%s'",
+ tableName, location);
+ sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
+
+ Object result =
+ scalarSql(
+ "CALL %s.system.migrate(table => '%s', drop_backup => true)", catalogName, tableName);
+ Assert.assertEquals("Should have added one file", 1L, result);
+ Assert.assertFalse(spark.catalog().tableExists(tableName + "_BACKUP_"));
+ }
+
@Test
public void testMigrateWithInvalidMetricsConfig() throws IOException {
Assume.assumeTrue(catalogName.equals("spark_catalog"));
diff --git a/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/actions/BaseMigrateTableSparkAction.java b/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/actions/BaseMigrateTableSparkAction.java
index 856b67dbcd..67297da6c2 100644
--- a/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/actions/BaseMigrateTableSparkAction.java
+++ b/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/actions/BaseMigrateTableSparkAction.java
@@ -59,6 +59,8 @@ public class BaseMigrateTableSparkAction
private final Identifier destTableIdent;
private final Identifier backupIdent;
+ private boolean dropBackup = false;
+
public BaseMigrateTableSparkAction(
SparkSession spark, CatalogPlugin sourceCatalog, Identifier sourceTableIdent) {
super(spark, sourceCatalog, sourceTableIdent);
@@ -95,6 +97,12 @@ public class BaseMigrateTableSparkAction
return this;
}
+ @Override
+ public MigrateTable dropBackup() {
+ this.dropBackup = true;
+ return this;
+ }
+
@Override
public MigrateTable.Result execute() {
String desc = String.format("Migrating table %s", destTableIdent().toString());
@@ -143,6 +151,8 @@ public class BaseMigrateTableSparkAction
LOG.error("Cannot abort staged changes", abortException);
}
}
+ } else if (dropBackup) {
+ dropBackupTable();
}
}
@@ -221,4 +231,13 @@ public class BaseMigrateTableSparkAction
e);
}
}
+
+ private void dropBackupTable() {
+ try {
+ destCatalog().dropTable(backupIdent);
+ } catch (Exception e) {
+ LOG.error(
+ "Cannot drop the backup table {}, after the migration is completed.", backupIdent, e);
+ }
+ }
}
diff --git a/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/procedures/MigrateTableProcedure.java b/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/procedures/MigrateTableProcedure.java
index a49dd7d526..fee6414d93 100644
--- a/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/procedures/MigrateTableProcedure.java
+++ b/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/procedures/MigrateTableProcedure.java
@@ -37,7 +37,8 @@ class MigrateTableProcedure extends BaseProcedure {
private static final ProcedureParameter[] PARAMETERS =
new ProcedureParameter[] {
ProcedureParameter.required("table", DataTypes.StringType),
- ProcedureParameter.optional("properties", STRING_MAP)
+ ProcedureParameter.optional("properties", STRING_MAP),
+ ProcedureParameter.optional("drop_backup", DataTypes.BooleanType)
};
private static final StructType OUTPUT_TYPE =
@@ -88,8 +89,18 @@ class MigrateTableProcedure extends BaseProcedure {
});
}
- MigrateTable.Result result =
- SparkActions.get().migrateTable(tableName).tableProperties(properties).execute();
+ boolean dropBackup = args.isNullAt(2) ? false : args.getBoolean(2);
+
+ MigrateTable migrateTable =
+ SparkActions.get().migrateTable(tableName).tableProperties(properties);
+
+ MigrateTable.Result result;
+ if (dropBackup) {
+ result = migrateTable.dropBackup().execute();
+ } else {
+ result = migrateTable.execute();
+ }
+
return new InternalRow[] {newInternalRow(result.migratedDataFilesCount())};
}
diff --git a/spark/v3.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMigrateTableProcedure.java b/spark/v3.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMigrateTableProcedure.java
index f9c150a3b1..83126b700e 100644
--- a/spark/v3.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMigrateTableProcedure.java
+++ b/spark/v3.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMigrateTableProcedure.java
@@ -103,7 +103,23 @@ public class TestMigrateTableProcedure extends SparkExtensionsTestBase {
ImmutableList.of(row(1L, "a"), row(1L, "a")),
sql("SELECT * FROM %s ORDER BY id", tableName));
- sql("DROP TABLE %s", tableName + "_BACKUP_");
+ sql("DROP TABLE IF EXISTS %s", tableName + "_BACKUP_");
+ }
+
+ @Test
+ public void testMigrateWithDropBackup() throws IOException {
+ Assume.assumeTrue(catalogName.equals("spark_catalog"));
+ String location = temp.newFolder().toString();
+ sql(
+ "CREATE TABLE %s (id bigint NOT NULL, data string) USING parquet LOCATION '%s'",
+ tableName, location);
+ sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
+
+ Object result =
+ scalarSql(
+ "CALL %s.system.migrate(table => '%s', drop_backup => true)", catalogName, tableName);
+ Assert.assertEquals("Should have added one file", 1L, result);
+ Assert.assertFalse(spark.catalog().tableExists(tableName + "_BACKUP_"));
}
@Test
diff --git a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/actions/BaseMigrateTableSparkAction.java b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/actions/BaseMigrateTableSparkAction.java
index 856b67dbcd..67297da6c2 100644
--- a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/actions/BaseMigrateTableSparkAction.java
+++ b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/actions/BaseMigrateTableSparkAction.java
@@ -59,6 +59,8 @@ public class BaseMigrateTableSparkAction
private final Identifier destTableIdent;
private final Identifier backupIdent;
+ private boolean dropBackup = false;
+
public BaseMigrateTableSparkAction(
SparkSession spark, CatalogPlugin sourceCatalog, Identifier sourceTableIdent) {
super(spark, sourceCatalog, sourceTableIdent);
@@ -95,6 +97,12 @@ public class BaseMigrateTableSparkAction
return this;
}
+ @Override
+ public MigrateTable dropBackup() {
+ this.dropBackup = true;
+ return this;
+ }
+
@Override
public MigrateTable.Result execute() {
String desc = String.format("Migrating table %s", destTableIdent().toString());
@@ -143,6 +151,8 @@ public class BaseMigrateTableSparkAction
LOG.error("Cannot abort staged changes", abortException);
}
}
+ } else if (dropBackup) {
+ dropBackupTable();
}
}
@@ -221,4 +231,13 @@ public class BaseMigrateTableSparkAction
e);
}
}
+
+ private void dropBackupTable() {
+ try {
+ destCatalog().dropTable(backupIdent);
+ } catch (Exception e) {
+ LOG.error(
+ "Cannot drop the backup table {}, after the migration is completed.", backupIdent, e);
+ }
+ }
}
diff --git a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/procedures/MigrateTableProcedure.java b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/procedures/MigrateTableProcedure.java
index a49dd7d526..fee6414d93 100644
--- a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/procedures/MigrateTableProcedure.java
+++ b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/procedures/MigrateTableProcedure.java
@@ -37,7 +37,8 @@ class MigrateTableProcedure extends BaseProcedure {
private static final ProcedureParameter[] PARAMETERS =
new ProcedureParameter[] {
ProcedureParameter.required("table", DataTypes.StringType),
- ProcedureParameter.optional("properties", STRING_MAP)
+ ProcedureParameter.optional("properties", STRING_MAP),
+ ProcedureParameter.optional("drop_backup", DataTypes.BooleanType)
};
private static final StructType OUTPUT_TYPE =
@@ -88,8 +89,18 @@ class MigrateTableProcedure extends BaseProcedure {
});
}
- MigrateTable.Result result =
- SparkActions.get().migrateTable(tableName).tableProperties(properties).execute();
+ boolean dropBackup = args.isNullAt(2) ? false : args.getBoolean(2);
+
+ MigrateTable migrateTable =
+ SparkActions.get().migrateTable(tableName).tableProperties(properties);
+
+ MigrateTable.Result result;
+ if (dropBackup) {
+ result = migrateTable.dropBackup().execute();
+ } else {
+ result = migrateTable.execute();
+ }
+
return new InternalRow[] {newInternalRow(result.migratedDataFilesCount())};
}
diff --git a/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMigrateTableProcedure.java b/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMigrateTableProcedure.java
index f9c150a3b1..83126b700e 100644
--- a/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMigrateTableProcedure.java
+++ b/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMigrateTableProcedure.java
@@ -103,7 +103,23 @@ public class TestMigrateTableProcedure extends SparkExtensionsTestBase {
ImmutableList.of(row(1L, "a"), row(1L, "a")),
sql("SELECT * FROM %s ORDER BY id", tableName));
- sql("DROP TABLE %s", tableName + "_BACKUP_");
+ sql("DROP TABLE IF EXISTS %s", tableName + "_BACKUP_");
+ }
+
+ @Test
+ public void testMigrateWithDropBackup() throws IOException {
+ Assume.assumeTrue(catalogName.equals("spark_catalog"));
+ String location = temp.newFolder().toString();
+ sql(
+ "CREATE TABLE %s (id bigint NOT NULL, data string) USING parquet LOCATION '%s'",
+ tableName, location);
+ sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
+
+ Object result =
+ scalarSql(
+ "CALL %s.system.migrate(table => '%s', drop_backup => true)", catalogName, tableName);
+ Assert.assertEquals("Should have added one file", 1L, result);
+ Assert.assertFalse(spark.catalog().tableExists(tableName + "_BACKUP_"));
}
@Test
diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/MigrateTableSparkAction.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/MigrateTableSparkAction.java
index e5716ea153..9106f97e47 100644
--- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/MigrateTableSparkAction.java
+++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/MigrateTableSparkAction.java
@@ -58,6 +58,8 @@ public class MigrateTableSparkAction extends BaseTableCreationSparkAction<Migrat
private final Identifier destTableIdent;
private final Identifier backupIdent;
+ private boolean dropBackup = false;
+
MigrateTableSparkAction(
SparkSession spark, CatalogPlugin sourceCatalog, Identifier sourceTableIdent) {
super(spark, sourceCatalog, sourceTableIdent);
@@ -94,6 +96,12 @@ public class MigrateTableSparkAction extends BaseTableCreationSparkAction<Migrat
return this;
}
+ @Override
+ public MigrateTableSparkAction dropBackup() {
+ this.dropBackup = true;
+ return this;
+ }
+
@Override
public MigrateTable.Result execute() {
String desc = String.format("Migrating table %s", destTableIdent().toString());
@@ -142,6 +150,8 @@ public class MigrateTableSparkAction extends BaseTableCreationSparkAction<Migrat
LOG.error("Cannot abort staged changes", abortException);
}
}
+ } else if (dropBackup) {
+ dropBackupTable();
}
}
@@ -220,4 +230,13 @@ public class MigrateTableSparkAction extends BaseTableCreationSparkAction<Migrat
e);
}
}
+
+ private void dropBackupTable() {
+ try {
+ destCatalog().dropTable(backupIdent);
+ } catch (Exception e) {
+ LOG.error(
+ "Cannot drop the backup table {}, after the migration is completed.", backupIdent, e);
+ }
+ }
}
diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/MigrateTableProcedure.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/MigrateTableProcedure.java
index a49dd7d526..e6467accdc 100644
--- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/MigrateTableProcedure.java
+++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/MigrateTableProcedure.java
@@ -22,6 +22,7 @@ import java.util.Map;
import org.apache.iceberg.actions.MigrateTable;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.spark.actions.MigrateTableSparkAction;
import org.apache.iceberg.spark.actions.SparkActions;
import org.apache.iceberg.spark.procedures.SparkProcedures.ProcedureBuilder;
import org.apache.spark.sql.catalyst.InternalRow;
@@ -37,7 +38,8 @@ class MigrateTableProcedure extends BaseProcedure {
private static final ProcedureParameter[] PARAMETERS =
new ProcedureParameter[] {
ProcedureParameter.required("table", DataTypes.StringType),
- ProcedureParameter.optional("properties", STRING_MAP)
+ ProcedureParameter.optional("properties", STRING_MAP),
+ ProcedureParameter.optional("drop_backup", DataTypes.BooleanType)
};
private static final StructType OUTPUT_TYPE =
@@ -88,8 +90,18 @@ class MigrateTableProcedure extends BaseProcedure {
});
}
- MigrateTable.Result result =
- SparkActions.get().migrateTable(tableName).tableProperties(properties).execute();
+ boolean dropBackup = args.isNullAt(2) ? false : args.getBoolean(2);
+
+ MigrateTableSparkAction migrateTableSparkAction =
+ SparkActions.get().migrateTable(tableName).tableProperties(properties);
+
+ final MigrateTable.Result result;
+ if (dropBackup) {
+ result = migrateTableSparkAction.dropBackup().execute();
+ } else {
+ result = migrateTableSparkAction.execute();
+ }
+
return new InternalRow[] {newInternalRow(result.migratedDataFilesCount())};
}
diff --git a/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMigrateTableProcedure.java b/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMigrateTableProcedure.java
index f9c150a3b1..8b2950b74f 100644
--- a/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMigrateTableProcedure.java
+++ b/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMigrateTableProcedure.java
@@ -71,7 +71,7 @@ public class TestMigrateTableProcedure extends SparkExtensionsTestBase {
ImmutableList.of(row(1L, "a"), row(1L, "a")),
sql("SELECT * FROM %s ORDER BY id", tableName));
- sql("DROP TABLE %s", tableName + "_BACKUP_");
+ sql("DROP TABLE IF EXISTS %s", tableName + "_BACKUP_");
}
@Test
@@ -103,7 +103,23 @@ public class TestMigrateTableProcedure extends SparkExtensionsTestBase {
ImmutableList.of(row(1L, "a"), row(1L, "a")),
sql("SELECT * FROM %s ORDER BY id", tableName));
- sql("DROP TABLE %s", tableName + "_BACKUP_");
+ sql("DROP TABLE IF EXISTS %s", tableName + "_BACKUP_");
+ }
+
+ @Test
+ public void testMigrateWithDropBackup() throws IOException {
+ Assume.assumeTrue(catalogName.equals("spark_catalog"));
+ String location = temp.newFolder().toString();
+ sql(
+ "CREATE TABLE %s (id bigint NOT NULL, data string) USING parquet LOCATION '%s'",
+ tableName, location);
+ sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
+
+ Object result =
+ scalarSql(
+ "CALL %s.system.migrate(table => '%s', drop_backup => true)", catalogName, tableName);
+ Assert.assertEquals("Should have added one file", 1L, result);
+ Assert.assertFalse(spark.catalog().tableExists(tableName + "_BACKUP_"));
}
@Test
diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/MigrateTableSparkAction.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/MigrateTableSparkAction.java
index e5716ea153..9106f97e47 100644
--- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/MigrateTableSparkAction.java
+++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/MigrateTableSparkAction.java
@@ -58,6 +58,8 @@ public class MigrateTableSparkAction extends BaseTableCreationSparkAction<Migrat
private final Identifier destTableIdent;
private final Identifier backupIdent;
+ private boolean dropBackup = false;
+
MigrateTableSparkAction(
SparkSession spark, CatalogPlugin sourceCatalog, Identifier sourceTableIdent) {
super(spark, sourceCatalog, sourceTableIdent);
@@ -94,6 +96,12 @@ public class MigrateTableSparkAction extends BaseTableCreationSparkAction<Migrat
return this;
}
+ @Override
+ public MigrateTableSparkAction dropBackup() {
+ this.dropBackup = true;
+ return this;
+ }
+
@Override
public MigrateTable.Result execute() {
String desc = String.format("Migrating table %s", destTableIdent().toString());
@@ -142,6 +150,8 @@ public class MigrateTableSparkAction extends BaseTableCreationSparkAction<Migrat
LOG.error("Cannot abort staged changes", abortException);
}
}
+ } else if (dropBackup) {
+ dropBackupTable();
}
}
@@ -220,4 +230,13 @@ public class MigrateTableSparkAction extends BaseTableCreationSparkAction<Migrat
e);
}
}
+
+ private void dropBackupTable() {
+ try {
+ destCatalog().dropTable(backupIdent);
+ } catch (Exception e) {
+ LOG.error(
+ "Cannot drop the backup table {}, after the migration is completed.", backupIdent, e);
+ }
+ }
}
diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/MigrateTableProcedure.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/MigrateTableProcedure.java
index a49dd7d526..aaa6d2cb23 100644
--- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/MigrateTableProcedure.java
+++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/MigrateTableProcedure.java
@@ -22,6 +22,7 @@ import java.util.Map;
import org.apache.iceberg.actions.MigrateTable;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.spark.actions.MigrateTableSparkAction;
import org.apache.iceberg.spark.actions.SparkActions;
import org.apache.iceberg.spark.procedures.SparkProcedures.ProcedureBuilder;
import org.apache.spark.sql.catalyst.InternalRow;
@@ -37,7 +38,8 @@ class MigrateTableProcedure extends BaseProcedure {
private static final ProcedureParameter[] PARAMETERS =
new ProcedureParameter[] {
ProcedureParameter.required("table", DataTypes.StringType),
- ProcedureParameter.optional("properties", STRING_MAP)
+ ProcedureParameter.optional("properties", STRING_MAP),
+ ProcedureParameter.optional("drop_backup", DataTypes.BooleanType)
};
private static final StructType OUTPUT_TYPE =
@@ -88,8 +90,18 @@ class MigrateTableProcedure extends BaseProcedure {
});
}
- MigrateTable.Result result =
- SparkActions.get().migrateTable(tableName).tableProperties(properties).execute();
+ boolean dropBackup = args.isNullAt(2) ? false : args.getBoolean(2);
+
+ MigrateTableSparkAction migrateTableSparkAction =
+ SparkActions.get().migrateTable(tableName).tableProperties(properties);
+
+ MigrateTable.Result result;
+ if (dropBackup) {
+ result = migrateTableSparkAction.dropBackup().execute();
+ } else {
+ result = migrateTableSparkAction.execute();
+ }
+
return new InternalRow[] {newInternalRow(result.migratedDataFilesCount())};
}