You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by sz...@apache.org on 2022/10/18 18:28:56 UTC

[iceberg] branch master updated: Spark: Add option to remove backup table after successful migrate action (#5622)

This is an automated email from the ASF dual-hosted git repository.

szehon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 754cff910d Spark: Add option to remove backup table after successful migrate action (#5622)
754cff910d is described below

commit 754cff910d8e5bb45bb36113d3d8730a5bc9203c
Author: Rishi <sr...@gmail.com>
AuthorDate: Tue Oct 18 11:28:50 2022 -0700

    Spark: Add option to remove backup table after successful migrate action (#5622)
---
 .palantir/revapi.yml                                 |  3 +++
 .../org/apache/iceberg/actions/MigrateTable.java     |  7 +++++++
 .../spark/extensions/TestMigrateTableProcedure.java  | 16 ++++++++++++++++
 .../spark/actions/BaseMigrateTableSparkAction.java   | 19 +++++++++++++++++++
 .../spark/procedures/MigrateTableProcedure.java      | 17 ++++++++++++++---
 .../spark/extensions/TestMigrateTableProcedure.java  | 18 +++++++++++++++++-
 .../spark/actions/BaseMigrateTableSparkAction.java   | 19 +++++++++++++++++++
 .../spark/procedures/MigrateTableProcedure.java      | 17 ++++++++++++++---
 .../spark/extensions/TestMigrateTableProcedure.java  | 18 +++++++++++++++++-
 .../spark/actions/MigrateTableSparkAction.java       | 19 +++++++++++++++++++
 .../spark/procedures/MigrateTableProcedure.java      | 18 +++++++++++++++---
 .../spark/extensions/TestMigrateTableProcedure.java  | 20 ++++++++++++++++++--
 .../spark/actions/MigrateTableSparkAction.java       | 19 +++++++++++++++++++
 .../spark/procedures/MigrateTableProcedure.java      | 18 +++++++++++++++---
 14 files changed, 212 insertions(+), 16 deletions(-)

diff --git a/.palantir/revapi.yml b/.palantir/revapi.yml
index ffbcbeee15..2edd123425 100644
--- a/.palantir/revapi.yml
+++ b/.palantir/revapi.yml
@@ -13,6 +13,9 @@ acceptedBreaks:
     - code: "java.method.addedToInterface"
       new: "method java.lang.String org.apache.iceberg.expressions.Reference<T>::name()"
       justification: "All subclasses implement name"
+    - code: "java.method.addedToInterface"
+      new: "method org.apache.iceberg.actions.MigrateTable org.apache.iceberg.actions.MigrateTable::dropBackup()"
+      justification: "Adding new functionality to allow for dropping backup table"
     - code: "java.method.addedToInterface"
       new: "method java.util.List<org.apache.iceberg.StatisticsFile> org.apache.iceberg.Table::statisticsFiles()"
       justification: "new API method"
diff --git a/api/src/main/java/org/apache/iceberg/actions/MigrateTable.java b/api/src/main/java/org/apache/iceberg/actions/MigrateTable.java
index e645fad217..36da1d6607 100644
--- a/api/src/main/java/org/apache/iceberg/actions/MigrateTable.java
+++ b/api/src/main/java/org/apache/iceberg/actions/MigrateTable.java
@@ -41,6 +41,13 @@ public interface MigrateTable extends Action<MigrateTable, MigrateTable.Result>
    */
   MigrateTable tableProperty(String name, String value);
 
+  /**
+   * Drops the backup of the original table after a successful migration
+   *
+   * @return this for method chaining
+   */
+  MigrateTable dropBackup();
+
   /** The action result that contains a summary of the execution. */
   interface Result {
     /** Returns the number of migrated data files. */
diff --git a/spark/v3.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMigrateTableProcedure.java b/spark/v3.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMigrateTableProcedure.java
index f9c150a3b1..bfeeee8066 100644
--- a/spark/v3.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMigrateTableProcedure.java
+++ b/spark/v3.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMigrateTableProcedure.java
@@ -106,6 +106,22 @@ public class TestMigrateTableProcedure extends SparkExtensionsTestBase {
     sql("DROP TABLE %s", tableName + "_BACKUP_");
   }
 
+  @Test
+  public void testMigrateWithDropBackup() throws IOException {
+    Assume.assumeTrue(catalogName.equals("spark_catalog"));
+    String location = temp.newFolder().toString();
+    sql(
+        "CREATE TABLE %s (id bigint NOT NULL, data string) USING parquet LOCATION '%s'",
+        tableName, location);
+    sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
+
+    Object result =
+        scalarSql(
+            "CALL %s.system.migrate(table => '%s', drop_backup => true)", catalogName, tableName);
+    Assert.assertEquals("Should have added one file", 1L, result);
+    Assert.assertFalse(spark.catalog().tableExists(tableName + "_BACKUP_"));
+  }
+
   @Test
   public void testMigrateWithInvalidMetricsConfig() throws IOException {
     Assume.assumeTrue(catalogName.equals("spark_catalog"));
diff --git a/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/actions/BaseMigrateTableSparkAction.java b/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/actions/BaseMigrateTableSparkAction.java
index 856b67dbcd..67297da6c2 100644
--- a/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/actions/BaseMigrateTableSparkAction.java
+++ b/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/actions/BaseMigrateTableSparkAction.java
@@ -59,6 +59,8 @@ public class BaseMigrateTableSparkAction
   private final Identifier destTableIdent;
   private final Identifier backupIdent;
 
+  private boolean dropBackup = false;
+
   public BaseMigrateTableSparkAction(
       SparkSession spark, CatalogPlugin sourceCatalog, Identifier sourceTableIdent) {
     super(spark, sourceCatalog, sourceTableIdent);
@@ -95,6 +97,12 @@ public class BaseMigrateTableSparkAction
     return this;
   }
 
+  @Override
+  public MigrateTable dropBackup() {
+    this.dropBackup = true;
+    return this;
+  }
+
   @Override
   public MigrateTable.Result execute() {
     String desc = String.format("Migrating table %s", destTableIdent().toString());
@@ -143,6 +151,8 @@ public class BaseMigrateTableSparkAction
             LOG.error("Cannot abort staged changes", abortException);
           }
         }
+      } else if (dropBackup) {
+        dropBackupTable();
       }
     }
 
@@ -221,4 +231,13 @@ public class BaseMigrateTableSparkAction
           e);
     }
   }
+
+  private void dropBackupTable() {
+    try {
+      destCatalog().dropTable(backupIdent);
+    } catch (Exception e) {
+      LOG.error(
+          "Cannot drop the backup table {}, after the migration is completed.", backupIdent, e);
+    }
+  }
 }
diff --git a/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/procedures/MigrateTableProcedure.java b/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/procedures/MigrateTableProcedure.java
index a49dd7d526..fee6414d93 100644
--- a/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/procedures/MigrateTableProcedure.java
+++ b/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/procedures/MigrateTableProcedure.java
@@ -37,7 +37,8 @@ class MigrateTableProcedure extends BaseProcedure {
   private static final ProcedureParameter[] PARAMETERS =
       new ProcedureParameter[] {
         ProcedureParameter.required("table", DataTypes.StringType),
-        ProcedureParameter.optional("properties", STRING_MAP)
+        ProcedureParameter.optional("properties", STRING_MAP),
+        ProcedureParameter.optional("drop_backup", DataTypes.BooleanType)
       };
 
   private static final StructType OUTPUT_TYPE =
@@ -88,8 +89,18 @@ class MigrateTableProcedure extends BaseProcedure {
               });
     }
 
-    MigrateTable.Result result =
-        SparkActions.get().migrateTable(tableName).tableProperties(properties).execute();
+    boolean dropBackup = args.isNullAt(2) ? false : args.getBoolean(2);
+
+    MigrateTable migrateTable =
+        SparkActions.get().migrateTable(tableName).tableProperties(properties);
+
+    MigrateTable.Result result;
+    if (dropBackup) {
+      result = migrateTable.dropBackup().execute();
+    } else {
+      result = migrateTable.execute();
+    }
+
     return new InternalRow[] {newInternalRow(result.migratedDataFilesCount())};
   }
 
diff --git a/spark/v3.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMigrateTableProcedure.java b/spark/v3.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMigrateTableProcedure.java
index f9c150a3b1..83126b700e 100644
--- a/spark/v3.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMigrateTableProcedure.java
+++ b/spark/v3.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMigrateTableProcedure.java
@@ -103,7 +103,23 @@ public class TestMigrateTableProcedure extends SparkExtensionsTestBase {
         ImmutableList.of(row(1L, "a"), row(1L, "a")),
         sql("SELECT * FROM %s ORDER BY id", tableName));
 
-    sql("DROP TABLE %s", tableName + "_BACKUP_");
+    sql("DROP TABLE IF EXISTS %s", tableName + "_BACKUP_");
+  }
+
+  @Test
+  public void testMigrateWithDropBackup() throws IOException {
+    Assume.assumeTrue(catalogName.equals("spark_catalog"));
+    String location = temp.newFolder().toString();
+    sql(
+        "CREATE TABLE %s (id bigint NOT NULL, data string) USING parquet LOCATION '%s'",
+        tableName, location);
+    sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
+
+    Object result =
+        scalarSql(
+            "CALL %s.system.migrate(table => '%s', drop_backup => true)", catalogName, tableName);
+    Assert.assertEquals("Should have added one file", 1L, result);
+    Assert.assertFalse(spark.catalog().tableExists(tableName + "_BACKUP_"));
   }
 
   @Test
diff --git a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/actions/BaseMigrateTableSparkAction.java b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/actions/BaseMigrateTableSparkAction.java
index 856b67dbcd..67297da6c2 100644
--- a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/actions/BaseMigrateTableSparkAction.java
+++ b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/actions/BaseMigrateTableSparkAction.java
@@ -59,6 +59,8 @@ public class BaseMigrateTableSparkAction
   private final Identifier destTableIdent;
   private final Identifier backupIdent;
 
+  private boolean dropBackup = false;
+
   public BaseMigrateTableSparkAction(
       SparkSession spark, CatalogPlugin sourceCatalog, Identifier sourceTableIdent) {
     super(spark, sourceCatalog, sourceTableIdent);
@@ -95,6 +97,12 @@ public class BaseMigrateTableSparkAction
     return this;
   }
 
+  @Override
+  public MigrateTable dropBackup() {
+    this.dropBackup = true;
+    return this;
+  }
+
   @Override
   public MigrateTable.Result execute() {
     String desc = String.format("Migrating table %s", destTableIdent().toString());
@@ -143,6 +151,8 @@ public class BaseMigrateTableSparkAction
             LOG.error("Cannot abort staged changes", abortException);
           }
         }
+      } else if (dropBackup) {
+        dropBackupTable();
       }
     }
 
@@ -221,4 +231,13 @@ public class BaseMigrateTableSparkAction
           e);
     }
   }
+
+  private void dropBackupTable() {
+    try {
+      destCatalog().dropTable(backupIdent);
+    } catch (Exception e) {
+      LOG.error(
+          "Cannot drop the backup table {}, after the migration is completed.", backupIdent, e);
+    }
+  }
 }
diff --git a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/procedures/MigrateTableProcedure.java b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/procedures/MigrateTableProcedure.java
index a49dd7d526..fee6414d93 100644
--- a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/procedures/MigrateTableProcedure.java
+++ b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/procedures/MigrateTableProcedure.java
@@ -37,7 +37,8 @@ class MigrateTableProcedure extends BaseProcedure {
   private static final ProcedureParameter[] PARAMETERS =
       new ProcedureParameter[] {
         ProcedureParameter.required("table", DataTypes.StringType),
-        ProcedureParameter.optional("properties", STRING_MAP)
+        ProcedureParameter.optional("properties", STRING_MAP),
+        ProcedureParameter.optional("drop_backup", DataTypes.BooleanType)
       };
 
   private static final StructType OUTPUT_TYPE =
@@ -88,8 +89,18 @@ class MigrateTableProcedure extends BaseProcedure {
               });
     }
 
-    MigrateTable.Result result =
-        SparkActions.get().migrateTable(tableName).tableProperties(properties).execute();
+    boolean dropBackup = args.isNullAt(2) ? false : args.getBoolean(2);
+
+    MigrateTable migrateTable =
+        SparkActions.get().migrateTable(tableName).tableProperties(properties);
+
+    MigrateTable.Result result;
+    if (dropBackup) {
+      result = migrateTable.dropBackup().execute();
+    } else {
+      result = migrateTable.execute();
+    }
+
     return new InternalRow[] {newInternalRow(result.migratedDataFilesCount())};
   }
 
diff --git a/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMigrateTableProcedure.java b/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMigrateTableProcedure.java
index f9c150a3b1..83126b700e 100644
--- a/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMigrateTableProcedure.java
+++ b/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMigrateTableProcedure.java
@@ -103,7 +103,23 @@ public class TestMigrateTableProcedure extends SparkExtensionsTestBase {
         ImmutableList.of(row(1L, "a"), row(1L, "a")),
         sql("SELECT * FROM %s ORDER BY id", tableName));
 
-    sql("DROP TABLE %s", tableName + "_BACKUP_");
+    sql("DROP TABLE IF EXISTS %s", tableName + "_BACKUP_");
+  }
+
+  @Test
+  public void testMigrateWithDropBackup() throws IOException {
+    Assume.assumeTrue(catalogName.equals("spark_catalog"));
+    String location = temp.newFolder().toString();
+    sql(
+        "CREATE TABLE %s (id bigint NOT NULL, data string) USING parquet LOCATION '%s'",
+        tableName, location);
+    sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
+
+    Object result =
+        scalarSql(
+            "CALL %s.system.migrate(table => '%s', drop_backup => true)", catalogName, tableName);
+    Assert.assertEquals("Should have added one file", 1L, result);
+    Assert.assertFalse(spark.catalog().tableExists(tableName + "_BACKUP_"));
   }
 
   @Test
diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/MigrateTableSparkAction.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/MigrateTableSparkAction.java
index e5716ea153..9106f97e47 100644
--- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/MigrateTableSparkAction.java
+++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/MigrateTableSparkAction.java
@@ -58,6 +58,8 @@ public class MigrateTableSparkAction extends BaseTableCreationSparkAction<Migrat
   private final Identifier destTableIdent;
   private final Identifier backupIdent;
 
+  private boolean dropBackup = false;
+
   MigrateTableSparkAction(
       SparkSession spark, CatalogPlugin sourceCatalog, Identifier sourceTableIdent) {
     super(spark, sourceCatalog, sourceTableIdent);
@@ -94,6 +96,12 @@ public class MigrateTableSparkAction extends BaseTableCreationSparkAction<Migrat
     return this;
   }
 
+  @Override
+  public MigrateTableSparkAction dropBackup() {
+    this.dropBackup = true;
+    return this;
+  }
+
   @Override
   public MigrateTable.Result execute() {
     String desc = String.format("Migrating table %s", destTableIdent().toString());
@@ -142,6 +150,8 @@ public class MigrateTableSparkAction extends BaseTableCreationSparkAction<Migrat
             LOG.error("Cannot abort staged changes", abortException);
           }
         }
+      } else if (dropBackup) {
+        dropBackupTable();
       }
     }
 
@@ -220,4 +230,13 @@ public class MigrateTableSparkAction extends BaseTableCreationSparkAction<Migrat
           e);
     }
   }
+
+  private void dropBackupTable() {
+    try {
+      destCatalog().dropTable(backupIdent);
+    } catch (Exception e) {
+      LOG.error(
+          "Cannot drop the backup table {}, after the migration is completed.", backupIdent, e);
+    }
+  }
 }
diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/MigrateTableProcedure.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/MigrateTableProcedure.java
index a49dd7d526..e6467accdc 100644
--- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/MigrateTableProcedure.java
+++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/MigrateTableProcedure.java
@@ -22,6 +22,7 @@ import java.util.Map;
 import org.apache.iceberg.actions.MigrateTable;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.spark.actions.MigrateTableSparkAction;
 import org.apache.iceberg.spark.actions.SparkActions;
 import org.apache.iceberg.spark.procedures.SparkProcedures.ProcedureBuilder;
 import org.apache.spark.sql.catalyst.InternalRow;
@@ -37,7 +38,8 @@ class MigrateTableProcedure extends BaseProcedure {
   private static final ProcedureParameter[] PARAMETERS =
       new ProcedureParameter[] {
         ProcedureParameter.required("table", DataTypes.StringType),
-        ProcedureParameter.optional("properties", STRING_MAP)
+        ProcedureParameter.optional("properties", STRING_MAP),
+        ProcedureParameter.optional("drop_backup", DataTypes.BooleanType)
       };
 
   private static final StructType OUTPUT_TYPE =
@@ -88,8 +90,18 @@ class MigrateTableProcedure extends BaseProcedure {
               });
     }
 
-    MigrateTable.Result result =
-        SparkActions.get().migrateTable(tableName).tableProperties(properties).execute();
+    boolean dropBackup = args.isNullAt(2) ? false : args.getBoolean(2);
+
+    MigrateTableSparkAction migrateTableSparkAction =
+        SparkActions.get().migrateTable(tableName).tableProperties(properties);
+
+    final MigrateTable.Result result;
+    if (dropBackup) {
+      result = migrateTableSparkAction.dropBackup().execute();
+    } else {
+      result = migrateTableSparkAction.execute();
+    }
+
     return new InternalRow[] {newInternalRow(result.migratedDataFilesCount())};
   }
 
diff --git a/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMigrateTableProcedure.java b/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMigrateTableProcedure.java
index f9c150a3b1..8b2950b74f 100644
--- a/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMigrateTableProcedure.java
+++ b/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMigrateTableProcedure.java
@@ -71,7 +71,7 @@ public class TestMigrateTableProcedure extends SparkExtensionsTestBase {
         ImmutableList.of(row(1L, "a"), row(1L, "a")),
         sql("SELECT * FROM %s ORDER BY id", tableName));
 
-    sql("DROP TABLE %s", tableName + "_BACKUP_");
+    sql("DROP TABLE IF EXISTS %s", tableName + "_BACKUP_");
   }
 
   @Test
@@ -103,7 +103,23 @@ public class TestMigrateTableProcedure extends SparkExtensionsTestBase {
         ImmutableList.of(row(1L, "a"), row(1L, "a")),
         sql("SELECT * FROM %s ORDER BY id", tableName));
 
-    sql("DROP TABLE %s", tableName + "_BACKUP_");
+    sql("DROP TABLE IF EXISTS  %s", tableName + "_BACKUP_");
+  }
+
+  @Test
+  public void testMigrateWithDropBackup() throws IOException {
+    Assume.assumeTrue(catalogName.equals("spark_catalog"));
+    String location = temp.newFolder().toString();
+    sql(
+        "CREATE TABLE %s (id bigint NOT NULL, data string) USING parquet LOCATION '%s'",
+        tableName, location);
+    sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
+
+    Object result =
+        scalarSql(
+            "CALL %s.system.migrate(table => '%s', drop_backup => true)", catalogName, tableName);
+    Assert.assertEquals("Should have added one file", 1L, result);
+    Assert.assertFalse(spark.catalog().tableExists(tableName + "_BACKUP_"));
   }
 
   @Test
diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/MigrateTableSparkAction.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/MigrateTableSparkAction.java
index e5716ea153..9106f97e47 100644
--- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/MigrateTableSparkAction.java
+++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/MigrateTableSparkAction.java
@@ -58,6 +58,8 @@ public class MigrateTableSparkAction extends BaseTableCreationSparkAction<Migrat
   private final Identifier destTableIdent;
   private final Identifier backupIdent;
 
+  private boolean dropBackup = false;
+
   MigrateTableSparkAction(
       SparkSession spark, CatalogPlugin sourceCatalog, Identifier sourceTableIdent) {
     super(spark, sourceCatalog, sourceTableIdent);
@@ -94,6 +96,12 @@ public class MigrateTableSparkAction extends BaseTableCreationSparkAction<Migrat
     return this;
   }
 
+  @Override
+  public MigrateTableSparkAction dropBackup() {
+    this.dropBackup = true;
+    return this;
+  }
+
   @Override
   public MigrateTable.Result execute() {
     String desc = String.format("Migrating table %s", destTableIdent().toString());
@@ -142,6 +150,8 @@ public class MigrateTableSparkAction extends BaseTableCreationSparkAction<Migrat
             LOG.error("Cannot abort staged changes", abortException);
           }
         }
+      } else if (dropBackup) {
+        dropBackupTable();
       }
     }
 
@@ -220,4 +230,13 @@ public class MigrateTableSparkAction extends BaseTableCreationSparkAction<Migrat
           e);
     }
   }
+
+  private void dropBackupTable() {
+    try {
+      destCatalog().dropTable(backupIdent);
+    } catch (Exception e) {
+      LOG.error(
+          "Cannot drop the backup table {}, after the migration is completed.", backupIdent, e);
+    }
+  }
 }
diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/MigrateTableProcedure.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/MigrateTableProcedure.java
index a49dd7d526..aaa6d2cb23 100644
--- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/MigrateTableProcedure.java
+++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/MigrateTableProcedure.java
@@ -22,6 +22,7 @@ import java.util.Map;
 import org.apache.iceberg.actions.MigrateTable;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.spark.actions.MigrateTableSparkAction;
 import org.apache.iceberg.spark.actions.SparkActions;
 import org.apache.iceberg.spark.procedures.SparkProcedures.ProcedureBuilder;
 import org.apache.spark.sql.catalyst.InternalRow;
@@ -37,7 +38,8 @@ class MigrateTableProcedure extends BaseProcedure {
   private static final ProcedureParameter[] PARAMETERS =
       new ProcedureParameter[] {
         ProcedureParameter.required("table", DataTypes.StringType),
-        ProcedureParameter.optional("properties", STRING_MAP)
+        ProcedureParameter.optional("properties", STRING_MAP),
+        ProcedureParameter.optional("drop_backup", DataTypes.BooleanType)
       };
 
   private static final StructType OUTPUT_TYPE =
@@ -88,8 +90,18 @@ class MigrateTableProcedure extends BaseProcedure {
               });
     }
 
-    MigrateTable.Result result =
-        SparkActions.get().migrateTable(tableName).tableProperties(properties).execute();
+    boolean dropBackup = args.isNullAt(2) ? false : args.getBoolean(2);
+
+    MigrateTableSparkAction migrateTableSparkAction =
+        SparkActions.get().migrateTable(tableName).tableProperties(properties);
+
+    MigrateTable.Result result;
+    if (dropBackup) {
+      result = migrateTableSparkAction.dropBackup().execute();
+    } else {
+      result = migrateTableSparkAction.execute();
+    }
+
     return new InternalRow[] {newInternalRow(result.migratedDataFilesCount())};
   }