You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2022/07/21 23:33:13 UTC

[GitHub] [iceberg] ericlgoodman opened a new pull request, #5331: WIP: Adding support for Delta to Iceberg migration

ericlgoodman opened a new pull request, #5331:
URL: https://github.com/apache/iceberg/pull/5331

   **This is a WIP PR (adding unit tests and still testing edge cases) but wanted to put this out here to get initial feedback.**
   
   It adds support to migrate a Delta Lake table to an Iceberg table. Currently this will only create a new table and effectively compress the history of the table into a single commit. In the future, we can add functionality to optionally carry over the history of the Delta Lake table.
   
   It also assumes that the user's current Delta Lake resides in what has now been configured to be an Iceberg catalog. Thinking of adding an optional `destinationCatalog` for scenarios where users are attempting to move between catalogs. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] jackye1995 commented on a diff in pull request #5331: WIP: Adding support for Delta to Iceberg migration

Posted by GitBox <gi...@apache.org>.
jackye1995 commented on code in PR #5331:
URL: https://github.com/apache/iceberg/pull/5331#discussion_r954092386


##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/MigrateDeltaLakeTableSparkAction.java:
##########
@@ -0,0 +1,383 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.actions;
+
+import io.delta.standalone.DeltaLog;
+import io.delta.standalone.VersionLog;
+import io.delta.standalone.actions.Action;
+import io.delta.standalone.actions.AddFile;
+import io.delta.standalone.actions.RemoveFile;
+import java.io.File;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
+import org.apache.hadoop.fs.Path;
+import org.apache.iceberg.AppendFiles;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.DeleteFiles;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Metrics;
+import org.apache.iceberg.MetricsConfig;
+import org.apache.iceberg.OverwriteFiles;
+import org.apache.iceberg.PartitionField;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.SnapshotSummary;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.actions.BaseMigrateDeltaLakeTableActionResult;
+import org.apache.iceberg.actions.MigrateDeltaLakeTable;
+import org.apache.iceberg.data.TableMigrationUtil;
+import org.apache.iceberg.exceptions.AlreadyExistsException;
+import org.apache.iceberg.exceptions.NoSuchNamespaceException;
+import org.apache.iceberg.mapping.NameMapping;
+import org.apache.iceberg.mapping.NameMappingParser;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.spark.SparkSchemaUtil;
+import org.apache.iceberg.spark.source.StagedSparkTable;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException;
+import org.apache.spark.sql.connector.catalog.CatalogPlugin;
+import org.apache.spark.sql.connector.catalog.Identifier;
+import org.apache.spark.sql.connector.catalog.StagingTableCatalog;
+import org.apache.spark.sql.connector.expressions.LogicalExpressions;
+import org.apache.spark.sql.connector.expressions.Transform;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import org.jetbrains.annotations.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.collection.JavaConverters;
+
+/**
+ * Takes a Delta Lake table and attempts to transform it into an Iceberg table in the same location
+ * with the same identifier. Once complete the identifier which previously referred to a non-Iceberg
+ * table will refer to the newly migrated Iceberg table.
+ */
+public class MigrateDeltaLakeTableSparkAction implements MigrateDeltaLakeTable {
+
+  private static final Logger LOG = LoggerFactory.getLogger(MigrateDeltaLakeTableSparkAction.class);
+
+  private final Map<String, String> additionalProperties = Maps.newHashMap();
+  private final SparkSession spark;
+  private final DeltaLog deltaLog;
+  private final StagingTableCatalog destCatalog;
+  private final String deltaTableLocation;
+  private final Identifier newIdentifier;
+
+  MigrateDeltaLakeTableSparkAction(

Review Comment:
   and that class can live in iceberg-core library.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] jackye1995 commented on a diff in pull request #5331: WIP: Adding support for Delta to Iceberg migration

Posted by GitBox <gi...@apache.org>.
jackye1995 commented on code in PR #5331:
URL: https://github.com/apache/iceberg/pull/5331#discussion_r953981719


##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/MigrateDeltaLakeTableSparkAction.java:
##########
@@ -0,0 +1,383 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.actions;
+
+import io.delta.standalone.DeltaLog;
+import io.delta.standalone.VersionLog;
+import io.delta.standalone.actions.Action;
+import io.delta.standalone.actions.AddFile;
+import io.delta.standalone.actions.RemoveFile;
+import java.io.File;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
+import org.apache.hadoop.fs.Path;
+import org.apache.iceberg.AppendFiles;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.DeleteFiles;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Metrics;
+import org.apache.iceberg.MetricsConfig;
+import org.apache.iceberg.OverwriteFiles;
+import org.apache.iceberg.PartitionField;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.SnapshotSummary;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.actions.BaseMigrateDeltaLakeTableActionResult;
+import org.apache.iceberg.actions.MigrateDeltaLakeTable;
+import org.apache.iceberg.data.TableMigrationUtil;
+import org.apache.iceberg.exceptions.AlreadyExistsException;
+import org.apache.iceberg.exceptions.NoSuchNamespaceException;
+import org.apache.iceberg.mapping.NameMapping;
+import org.apache.iceberg.mapping.NameMappingParser;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.spark.SparkSchemaUtil;
+import org.apache.iceberg.spark.source.StagedSparkTable;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException;
+import org.apache.spark.sql.connector.catalog.CatalogPlugin;
+import org.apache.spark.sql.connector.catalog.Identifier;
+import org.apache.spark.sql.connector.catalog.StagingTableCatalog;
+import org.apache.spark.sql.connector.expressions.LogicalExpressions;
+import org.apache.spark.sql.connector.expressions.Transform;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import org.jetbrains.annotations.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.collection.JavaConverters;
+
+/**
+ * Takes a Delta Lake table and attempts to transform it into an Iceberg table in the same location
+ * with the same identifier. Once complete the identifier which previously referred to a non-Iceberg
+ * table will refer to the newly migrated Iceberg table.
+ */
+public class MigrateDeltaLakeTableSparkAction implements MigrateDeltaLakeTable {
+
+  private static final Logger LOG = LoggerFactory.getLogger(MigrateDeltaLakeTableSparkAction.class);
+
+  private final Map<String, String> additionalProperties = Maps.newHashMap();
+  private final SparkSession spark;
+  private final DeltaLog deltaLog;
+  private final StagingTableCatalog destCatalog;
+  private final String deltaTableLocation;
+  private final Identifier newIdentifier;
+
+  MigrateDeltaLakeTableSparkAction(
+      SparkSession spark,
+      CatalogPlugin destCatalog,
+      String deltaTableLocation,
+      Identifier newIdentifier) {
+    this.spark = spark;
+    this.destCatalog = checkDestinationCatalog(destCatalog);
+    this.newIdentifier = newIdentifier;
+    this.deltaTableLocation = deltaTableLocation;
+    this.deltaLog =
+        DeltaLog.forTable(spark.sessionState().newHadoopConf(), this.deltaTableLocation);
+  }
+
+  @Override
+  public Result execute() {
+    // Get a DeltaLog instance and retrieve the partitions (if applicable) of the table
+    io.delta.standalone.Snapshot updatedSnapshot = deltaLog.update();
+
+    StructType structType = getStructTypeFromDeltaSnapshot();
+
+    StagedSparkTable stagedTable =
+        stageDestTable(
+            updatedSnapshot,
+            deltaTableLocation,
+            destCatalog,
+            newIdentifier,
+            structType,
+            additionalProperties);
+    PartitionSpec partitionSpec = getPartitionSpecFromDeltaSnapshot(structType);
+
+    Table icebergTable = stagedTable.table();
+    copyFromDeltaLakeToIceberg(icebergTable, partitionSpec);
+
+    stagedTable.commitStagedChanges();
+    Snapshot snapshot = icebergTable.currentSnapshot();
+    long totalDataFiles =
+        Long.parseLong(snapshot.summary().get(SnapshotSummary.TOTAL_DATA_FILES_PROP));
+    LOG.info(
+        "Successfully loaded Iceberg metadata for {} files to {}",
+        totalDataFiles,
+        deltaTableLocation);
+    return new BaseMigrateDeltaLakeTableActionResult(totalDataFiles);
+  }
+
+  private void copyFromDeltaLakeToIceberg(Table table, PartitionSpec spec) {
+    // Get all changes starting from version 0
+    Iterator<VersionLog> it = deltaLog.getChanges(0, false);
+
+    while (it.hasNext()) {
+      VersionLog versionLog = it.next();
+      List<Action> actions = versionLog.getActions();
+
+      // We first need to iterate through to see what kind of transaction this was. There are 3
+      // cases:
+      // 1. AppendFile - when there are only AddFile instances (an INSERT on the table)
+      // 2. DeleteFiles - when there are only RemoveFile instances (a DELETE where all the records
+      // of file(s) were removed
+      // 3. OverwriteFiles - when there are a mix of AddFile and RemoveFile (a DELETE/UPDATE)
+
+      // Create a map of Delta Lake Action (AddFile, RemoveFile, etc.) --> List<Action>
+      Map<String, List<Action>> deltaLakeActionsMap =
+          actions.stream()
+              .filter(action -> action instanceof AddFile || action instanceof RemoveFile)
+              .collect(Collectors.groupingBy(a -> a.getClass().getSimpleName()));
+      // Scan the map so that we know what type of transaction this will be in Iceberg
+      IcebergTransactionType icebergTransactionType =
+          getIcebergTransactionTypeFromDeltaActions(deltaLakeActionsMap);
+      if (icebergTransactionType == null) {
+        return;
+      }
+
+      List<DataFile> filesToAdd = Lists.newArrayList();
+      List<DataFile> filesToRemove = Lists.newArrayList();
+      for (Action action : Iterables.concat(deltaLakeActionsMap.values())) {
+        DataFile dataFile = buildDataFileForAction(action, table, spec);
+        if (action instanceof AddFile) {
+          filesToAdd.add(dataFile);
+        } else {
+          // We would have thrown an exception above if it wasn't a RemoveFile
+          filesToRemove.add(dataFile);
+        }
+      }
+
+      switch (icebergTransactionType) {
+        case APPEND_FILES:
+          AppendFiles appendFiles = table.newAppend();
+          filesToAdd.forEach(appendFiles::appendFile);
+          appendFiles.commit();
+          break;
+        case DELETE_FILES:
+          DeleteFiles deleteFiles = table.newDelete();
+          filesToRemove.forEach(deleteFiles::deleteFile);
+          deleteFiles.commit();
+          break;
+        case OVERWRITE_FILES:
+          OverwriteFiles overwriteFiles = table.newOverwrite();
+          filesToAdd.forEach(overwriteFiles::addFile);
+          filesToRemove.forEach(overwriteFiles::deleteFile);
+          overwriteFiles.commit();
+          break;
+      }
+    }
+  }
+
+  private DataFile buildDataFileForAction(Action action, Table table, PartitionSpec spec) {
+    String path;
+    long size;
+    Map<String, String> partitionValues;
+
+    if (action instanceof AddFile) {
+      AddFile addFile = (AddFile) action;
+      path = addFile.getPath();
+      size = addFile.getSize();
+      partitionValues = addFile.getPartitionValues();
+    } else if (action instanceof RemoveFile) {
+      RemoveFile removeFile = (RemoveFile) action;
+      path = removeFile.getPath();
+      size =
+          removeFile
+              .getSize()
+              .orElseThrow(
+                  () ->
+                      new RuntimeException(
+                          String.format("File %s removed with specifying a size", path)));
+      partitionValues =
+          Optional.ofNullable(removeFile.getPartitionValues())
+              .orElseThrow(
+                  () ->
+                      new RuntimeException(
+                          String.format(
+                              "File %s removed without specifying partition values", path)));
+    } else {
+      throw new IllegalStateException(
+          String.format(
+              "Unexpected action type for Delta Lake: %s", action.getClass().getSimpleName()));
+    }
+
+    String fullFilePath = deltaLog.getPath().toString() + File.separator + path;
+    Metrics metrics = getMetricsForFile(table, fullFilePath);
+
+    String partition =
+        spec.fields().stream()
+            .map(PartitionField::name)
+            .map(name -> String.format("%s=%s", name, partitionValues.get(name)))
+            .collect(Collectors.joining("/"));
+
+    return DataFiles.builder(spec)
+        .withPath(fullFilePath)
+        .withFormat(FileFormat.PARQUET)
+        .withFileSizeInBytes(size)
+        .withMetrics(metrics)
+        .withPartitionPath(partition)
+        .withRecordCount(metrics.recordCount())
+        .build();
+  }
+
+  private Metrics getMetricsForFile(Table table, String fullFilePath) {
+    MetricsConfig metricsConfig = MetricsConfig.forTable(table);
+    String nameMappingString = table.properties().get(TableProperties.DEFAULT_NAME_MAPPING);
+    NameMapping nameMapping =
+        nameMappingString != null ? NameMappingParser.fromJson(nameMappingString) : null;
+    return TableMigrationUtil.getParquetMetrics(
+        new Path(fullFilePath), spark.sessionState().newHadoopConf(), metricsConfig, nameMapping);
+  }
+
+  @Nullable
+  private IcebergTransactionType getIcebergTransactionTypeFromDeltaActions(
+      Map<String, List<Action>> actionsMap) {
+    IcebergTransactionType icebergTransactionType;
+    if (actionsMap.containsKey(AddFile.class.getSimpleName())
+        && !actionsMap.containsKey(RemoveFile.class.getSimpleName())) {
+      icebergTransactionType = IcebergTransactionType.APPEND_FILES;
+    } else if (actionsMap.containsKey(RemoveFile.class.getSimpleName())
+        && !actionsMap.containsKey(AddFile.class.getSimpleName())) {
+      icebergTransactionType = IcebergTransactionType.DELETE_FILES;
+    } else if (actionsMap.containsKey(AddFile.class.getSimpleName())
+        && actionsMap.containsKey(RemoveFile.class.getSimpleName())) {
+      icebergTransactionType = IcebergTransactionType.OVERWRITE_FILES;
+    } else {
+      // Some other type of transaction, we can ignore
+      return null;
+    }
+    return icebergTransactionType;
+  }
+
+  private PartitionSpec getPartitionSpecFromDeltaSnapshot(StructType structType) {
+    Schema schema = SparkSchemaUtil.convert(structType);
+    PartitionSpec spec =
+        SparkSchemaUtil.identitySpec(
+            schema, deltaLog.snapshot().getMetadata().getPartitionColumns());
+    return spec == null ? PartitionSpec.unpartitioned() : spec;
+  }
+
+  private StructType getStructTypeFromDeltaSnapshot() {

Review Comment:
   why not just convert directly from Delta schema to Iceberg schema? Here we ended up converting to Spark and then convert to Iceberg, and as a result you had to make a few Spark classes and methods public.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] jackye1995 commented on a diff in pull request #5331: WIP: Adding support for Delta to Iceberg migration

Posted by GitBox <gi...@apache.org>.
jackye1995 commented on code in PR #5331:
URL: https://github.com/apache/iceberg/pull/5331#discussion_r953984527


##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/MigrateDeltaLakeTableSparkAction.java:
##########
@@ -0,0 +1,383 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.actions;
+
+import io.delta.standalone.DeltaLog;
+import io.delta.standalone.VersionLog;
+import io.delta.standalone.actions.Action;
+import io.delta.standalone.actions.AddFile;
+import io.delta.standalone.actions.RemoveFile;
+import java.io.File;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
+import org.apache.hadoop.fs.Path;
+import org.apache.iceberg.AppendFiles;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.DeleteFiles;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Metrics;
+import org.apache.iceberg.MetricsConfig;
+import org.apache.iceberg.OverwriteFiles;
+import org.apache.iceberg.PartitionField;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.SnapshotSummary;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.actions.BaseMigrateDeltaLakeTableActionResult;
+import org.apache.iceberg.actions.MigrateDeltaLakeTable;
+import org.apache.iceberg.data.TableMigrationUtil;
+import org.apache.iceberg.exceptions.AlreadyExistsException;
+import org.apache.iceberg.exceptions.NoSuchNamespaceException;
+import org.apache.iceberg.mapping.NameMapping;
+import org.apache.iceberg.mapping.NameMappingParser;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.spark.SparkSchemaUtil;
+import org.apache.iceberg.spark.source.StagedSparkTable;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException;
+import org.apache.spark.sql.connector.catalog.CatalogPlugin;
+import org.apache.spark.sql.connector.catalog.Identifier;
+import org.apache.spark.sql.connector.catalog.StagingTableCatalog;
+import org.apache.spark.sql.connector.expressions.LogicalExpressions;
+import org.apache.spark.sql.connector.expressions.Transform;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import org.jetbrains.annotations.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.collection.JavaConverters;
+
+/**
+ * Takes a Delta Lake table and attempts to transform it into an Iceberg table in the same location
+ * with the same identifier. Once complete the identifier which previously referred to a non-Iceberg
+ * table will refer to the newly migrated Iceberg table.
+ */
+public class MigrateDeltaLakeTableSparkAction implements MigrateDeltaLakeTable {
+
+  private static final Logger LOG = LoggerFactory.getLogger(MigrateDeltaLakeTableSparkAction.class);
+
+  private final Map<String, String> additionalProperties = Maps.newHashMap();
+  private final SparkSession spark;
+  private final DeltaLog deltaLog;
+  private final StagingTableCatalog destCatalog;
+  private final String deltaTableLocation;
+  private final Identifier newIdentifier;
+
+  MigrateDeltaLakeTableSparkAction(

Review Comment:
   Can we have a class `BaseMigrateDeltaLakeTableActon`? Because lots of logic could be shared when we want to extend this feature to other engines like Flink and Trino.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] jackye1995 commented on pull request #5331: WIP: Adding support for Delta to Iceberg migration

Posted by "jackye1995 (via GitHub)" <gi...@apache.org>.
jackye1995 commented on PR #5331:
URL: https://github.com/apache/iceberg/pull/5331#issuecomment-1462954705

   Close since the PR is merged.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] ericlgoodman commented on pull request #5331: WIP: Adding support for Delta to Iceberg migration

Posted by GitBox <gi...@apache.org>.
ericlgoodman commented on PR #5331:
URL: https://github.com/apache/iceberg/pull/5331#issuecomment-1201826444

   Adding here my primary concern with this PR - and in general a concern going forward with Spark and using multiple tables such as Delta Lake and Iceberg.
   
   Spark reads tables through whatever catalog is located at the first part of a table's identifier. There can only be 1 catalog per identifier, and different catalogs have different capabilities. For example, the `DeltaCatalog` can read Delta Lake and generic Hive tables, and the `SparkSessionCatalog` can read Iceberg + Hive tables.
   
   In theory, in order to read from multiple table types in one Spark session, a user would initialize a `DeltaCatalog`, at say, `delta` and then the `SparkSessionCatalog` at `iceberg`. Then all their Delta Lake tables would be located at `delta.my_delta_database.my_delta_lake_table` and all their Iceberg tables at `iceberg.my_iceberg_database.my_iceberg_table`. Unfortunately, this doesn't work out of the box. Both of these catalog implementations are designed to be used by overriding the default Spark catalog, which is located at `spark_catalog`. `CatalogExtension`, from which `DeltaCatalog` and `SparkSessionCatalog` both inherit from, contains a method `setDelegateCatalog(CatalogPlugin delegate)`. As the Javadoc reads:
   
   ```java
    /**
      * This will be called only once by Spark to pass in the Spark built-in session catalog, after
      * {@link #initialize(String, CaseInsensitiveStringMap)} is called.
      */
     void setDelegateCatalog(CatalogPlugin delegate);
   ```
   
   A user can fix this issue by manually calling this method during Spark setup and setting the delegate to the one in the default Spark catalog. But most users presumably are not doing this, and some users might face difficulty depending on their service provider and how much abstraction/configuration has been taken away from them during setup.
   
   This basically means that in today's world, it doesn't seem realistic that users currently have a simple way to use one Spark session to read/migrate between different table types. This solution might make make sense to implement first, as users may find that a Delta/Iceberg/Hudi table makes sense for them in one context but another one is preferable in another.
   
   When it comes to migration, there are basically two options:
   1. Create a more abstract Catalog implementation that can read Iceberg/Delta/Hudi/Hive tables dynamically, similar to what happens in the Trino Hive connector. The connector inspects the table properties and determines at runtime whether to redirect to another connector. Similarly, a Spark catalog could simply delegate to specific catalogs if it sees certain table type specific properties.
   2. Provide an easier method for users to not have to override the default catalog for these table type specific catalog implementations. If the Delta catalog was located at `delta`, and Iceberg at `iceberg`, then users could just keep their different table types in different catalogs and migration could take an optional parameter of the new desired catalog.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] jackye1995 closed pull request #5331: WIP: Adding support for Delta to Iceberg migration

Posted by "jackye1995 (via GitHub)" <gi...@apache.org>.
jackye1995 closed pull request #5331: WIP: Adding support for Delta to Iceberg migration
URL: https://github.com/apache/iceberg/pull/5331


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] ericlgoodman commented on a diff in pull request #5331: WIP: Adding support for Delta to Iceberg migration

Posted by GitBox <gi...@apache.org>.
ericlgoodman commented on code in PR #5331:
URL: https://github.com/apache/iceberg/pull/5331#discussion_r952995031


##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/MigrateTableSparkAction.java:
##########
@@ -121,7 +121,7 @@ private MigrateTable.Result doExecute() {
 
       Some<String> backupNamespace = Some.apply(backupIdent.namespace()[0]);
       TableIdentifier v1BackupIdent = new TableIdentifier(backupIdent.name(), backupNamespace);
-      String stagingLocation = getMetadataLocation(icebergTable);
+      String stagingLocation = SparkTableUtil.getIcebergMetadataLocation(icebergTable);

Review Comment:
   Need to remove this change



##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/SnapshotTableSparkAction.java:
##########
@@ -124,7 +124,7 @@ private SnapshotTable.Result doExecute() {
       ensureNameMappingPresent(icebergTable);
 
       TableIdentifier v1TableIdent = v1SourceTable().identifier();
-      String stagingLocation = getMetadataLocation(icebergTable);
+      String stagingLocation = SparkTableUtil.getIcebergMetadataLocation(icebergTable);

Review Comment:
   Same here



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] JonasJ-ap commented on pull request #5331: WIP: Adding support for Delta to Iceberg migration

Posted by GitBox <gi...@apache.org>.
JonasJ-ap commented on PR #5331:
URL: https://github.com/apache/iceberg/pull/5331#issuecomment-1322684304

   Hi @ericlgoodman. My name is Rushan Jiang, a CS undergrad at CMU. I am interested in learning and contributing to this migration support. I saw you did not update this PR for some time. Would you mind allowing me to continue your work? 
   
   I appreciate your time and consideration.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] jackye1995 commented on a diff in pull request #5331: WIP: Adding support for Delta to Iceberg migration

Posted by GitBox <gi...@apache.org>.
jackye1995 commented on code in PR #5331:
URL: https://github.com/apache/iceberg/pull/5331#discussion_r938228059


##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/MigrateDeltaLakeTableSparkAction.java:
##########
@@ -0,0 +1,311 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.actions;
+
+import io.delta.standalone.DeltaLog;
+import io.delta.standalone.actions.AddFile;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
+import org.apache.hadoop.fs.Path;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.SnapshotSummary;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.actions.BaseMigrateDeltaLakeTableActionResult;
+import org.apache.iceberg.actions.MigrateDeltaLakeTable;
+import org.apache.iceberg.exceptions.AlreadyExistsException;
+import org.apache.iceberg.exceptions.NoSuchNamespaceException;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.spark.SparkSchemaUtil;
+import org.apache.iceberg.spark.SparkTableUtil;
+import org.apache.iceberg.spark.SparkTypeToType;
+import org.apache.iceberg.spark.SparkTypeVisitor;
+import org.apache.iceberg.spark.source.StagedSparkTable;
+import org.apache.iceberg.types.Type;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.catalyst.TableIdentifier;
+import org.apache.spark.sql.catalyst.analysis.NoSuchDatabaseException;
+import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
+import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException;
+import org.apache.spark.sql.catalyst.catalog.CatalogTable;
+import org.apache.spark.sql.connector.catalog.CatalogPlugin;
+import org.apache.spark.sql.connector.catalog.Identifier;
+import org.apache.spark.sql.connector.catalog.StagingTableCatalog;
+import org.apache.spark.sql.connector.expressions.LogicalExpressions;
+import org.apache.spark.sql.connector.expressions.Transform;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Some;
+import scala.collection.JavaConverters;
+
+/**
+ * Takes a Delta Lake table and attempts to transform it into an Iceberg table in the same location with the same
+ * identifier. Once complete the identifier which previously referred to a non-Iceberg table will refer to the newly
+ * migrated Iceberg table.
+ */
+public class MigrateDeltaLakeTableSparkAction
+    implements MigrateDeltaLakeTable {
+
+  private static final Logger LOG = LoggerFactory.getLogger(MigrateDeltaLakeTableSparkAction.class);
+  private static final String BACKUP_SUFFIX = "_BACKUP_";
+
+  private final Map<String, String> additionalProperties = Maps.newHashMap();
+  private final SparkSession spark;
+  private final DeltaLog deltaLog;
+  private final StagingTableCatalog destCatalog;
+  private final String location;
+  private final Identifier identifier;
+  private final Identifier backupIdent;
+
+  MigrateDeltaLakeTableSparkAction(
+      SparkSession spark,
+      CatalogPlugin sourceCatalog,
+      Identifier sourceTableIdent
+  ) {
+    this.spark = spark;
+    this.destCatalog = checkDestinationCatalog(sourceCatalog);
+    this.identifier = sourceTableIdent;
+    this.backupIdent = Identifier.of(sourceTableIdent.namespace(), sourceTableIdent.name() + BACKUP_SUFFIX);
+    try {
+      CatalogTable tableMetadata = spark.sessionState().catalogManager().v1SessionCatalog()
+          .getTableMetadata(new TableIdentifier(sourceTableIdent.name()));
+      this.location = tableMetadata.location().getPath();
+      this.deltaLog = DeltaLog.forTable(spark.sessionState().newHadoopConf(), tableMetadata.location().getPath());
+    } catch (NoSuchTableException | NoSuchDatabaseException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public Result execute() {
+    // Rename the table
+    renameAndBackupSourceTable();
+
+    // Get a DeltaLog instance and retrieve the partitions (if applicable) of the table
+    io.delta.standalone.Snapshot updatedSnapshot = deltaLog.update();
+    List<SparkTableUtil.SparkPartition> partitions =
+        getSparkPartitionsFromDeltaSnapshot(updatedSnapshot, deltaLog.getPath());
+
+    StructType structType = getStructTypeFromDeltaSnapshot(updatedSnapshot);
+
+    Table icebergTable;
+    StagedSparkTable stagedTable = null;
+    boolean error = true;
+    try {
+      stagedTable = stageDestTable(
+              updatedSnapshot, location, destCatalog, identifier, structType, additionalProperties);
+      icebergTable = stagedTable.table();
+
+      PartitionSpec partitionSpec = getPartitionSpecFromDeltaSnapshot(updatedSnapshot, structType);
+      String stagingLocation = SparkTableUtil.getIcebergMetadataLocation(icebergTable);
+
+      SparkTableUtil.importSparkTable(
+              spark,
+              new TableIdentifier(backupIdent.name(), Some.apply(backupIdent.namespace()[0])),
+              icebergTable,
+              stagingLocation,
+              Collections.emptyMap(),
+              false,
+              partitionSpec,
+              partitions
+      );
+
+      stagedTable.commitStagedChanges();
+      error = false;
+    } finally {
+      if (error) {
+        LOG.error("Failed to perform the migration, aborting table creation and restoring the original table");
+        restoreSourceTable(destCatalog, backupIdent, identifier);
+        if (stagedTable != null) {
+          try {
+            stagedTable.abortStagedChanges();
+          } catch (Exception abortException) {
+            LOG.error("Cannot abort staged changes", abortException);
+          }
+        }
+      }
+    }
+    Snapshot snapshot = icebergTable.currentSnapshot();
+    long numFilesMigrated = Long.parseLong(snapshot.summary().get(SnapshotSummary.TOTAL_DATA_FILES_PROP));
+    LOG.info("Successfully loaded Iceberg metadata for {} files to {}", numFilesMigrated, identifier);
+    return new BaseMigrateDeltaLakeTableActionResult(numFilesMigrated);
+  }
+
+  private static void restoreSourceTable(StagingTableCatalog destinationCatalog,
+                                  Identifier backupIdent, Identifier sourceTableIdent) {
+    try {
+      destinationCatalog.renameTable(backupIdent, sourceTableIdent);
+    } catch (org.apache.spark.sql.catalyst.analysis.NoSuchTableException e) {
+      LOG.error("Cannot restore the original table, the backup table {} cannot be found", backupIdent, e);
+
+    } catch (org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException e) {
+      LOG.error("Cannot restore the original table, a table with the original name exists. " +
+              "Use the backup table {} to restore the original table manually.", backupIdent, e);
+    }
+  }
+
+  private static List<SparkTableUtil.SparkPartition> getSparkPartitionsFromDeltaSnapshot(
+      io.delta.standalone.Snapshot updatedSnapshot,
+      Path deltaLogPath
+  ) {
+    return updatedSnapshot.getAllFiles()
+        .stream()
+        // Map each partition to the list of files within it
+        .collect(Collectors.groupingBy(AddFile::getPartitionValues))
+        .entrySet()
+        .stream()
+        .map(entry -> {
+              // We don't care what value we take since they will all have the same prefix.
+              // The arbitrary file will have a path that looks like "partition1/partition2/file.parquet,
+              // We're interested in the part prior to the filename
+              AddFile addFile = entry.getValue().get(0);
+              String pathBeforeFileName = addFile.getPath().substring(0, addFile.getPath().lastIndexOf("/"));
+              String fullPath = new Path(deltaLogPath, pathBeforeFileName).toString();
+
+              return new SparkTableUtil.SparkPartition(
+                  entry.getKey(), // Map containing name and values of partitions
+                  fullPath,
+                  // Delta tables only support parquet
+                  "parquet"
+              );
+        }
+        )
+        .collect(Collectors.toList());
+  }
+
+  private static PartitionSpec getPartitionSpecFromDeltaSnapshot(
+      io.delta.standalone.Snapshot updatedSnapshot,
+      StructType structType
+  ) {
+    Type converted = SparkTypeVisitor.visit(structType, new SparkTypeToType(structType));

Review Comment:
   we can use `SparkSchemaUtil.convert`



##########
spark/v3.3/build.gradle:
##########
@@ -63,6 +63,8 @@ project(":iceberg-spark:iceberg-spark-${sparkMajorVersion}_${scalaVersion}") {
     implementation("org.apache.parquet:parquet-column")
     implementation("org.apache.parquet:parquet-hadoop")
 
+    implementation ("io.delta:delta-standalone_${scalaVersion}")

Review Comment:
   can we make this `compileOnly`, not everyone wants this as a part of their Spark runtime.



##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java:
##########
@@ -380,7 +381,9 @@ private static Iterator<ManifestFile> buildManifest(SerializableConfiguration co
    */
   public static void importSparkTable(SparkSession spark, TableIdentifier sourceTableIdent, Table targetTable,
                                       String stagingDir, Map<String, String> partitionFilter,
-                                      boolean checkDuplicateFiles) {
+                                      boolean checkDuplicateFiles,
+                                      PartitionSpec nullableSpec,
+                                      List<SparkPartition> nullablePartitions) {

Review Comment:
   +1, avoid changing existing public methods, we can do something like:
   
   ```
   public static void importSparkTable(SparkSession spark, TableIdentifier sourceTableIdent, Table targetTable,
                                         String stagingDir, Map<String, String> partitionFilter,
                                         boolean checkDuplicateFiles) {
     PartitionSpec spec = SparkSchemaUtil.specForTable(spark, sourceTableIdentWithDB.unquotedString());
     ...
     nullablePartitions(...,  spec, sourceTablePartitions)
         
   }
   
   public static void importSparkTable(SparkSession spark, TableIdentifier sourceTableIdent, Table targetTable,
                                         String stagingDir, Map<String, String> partitionFilter,
                                         boolean checkDuplicateFiles) {
                                         boolean checkDuplicateFiles,
                                         PartitionSpec spec,
                                         List<SparkPartition> partitions) {
      ...
   }
   ```



##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/MigrateDeltaLakeTableSparkAction.java:
##########
@@ -0,0 +1,311 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.actions;
+
+import io.delta.standalone.DeltaLog;
+import io.delta.standalone.actions.AddFile;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
+import org.apache.hadoop.fs.Path;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.SnapshotSummary;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.actions.BaseMigrateDeltaLakeTableActionResult;
+import org.apache.iceberg.actions.MigrateDeltaLakeTable;
+import org.apache.iceberg.exceptions.AlreadyExistsException;
+import org.apache.iceberg.exceptions.NoSuchNamespaceException;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.spark.SparkSchemaUtil;
+import org.apache.iceberg.spark.SparkTableUtil;
+import org.apache.iceberg.spark.SparkTypeToType;
+import org.apache.iceberg.spark.SparkTypeVisitor;
+import org.apache.iceberg.spark.source.StagedSparkTable;
+import org.apache.iceberg.types.Type;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.catalyst.TableIdentifier;
+import org.apache.spark.sql.catalyst.analysis.NoSuchDatabaseException;
+import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
+import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException;
+import org.apache.spark.sql.catalyst.catalog.CatalogTable;
+import org.apache.spark.sql.connector.catalog.CatalogPlugin;
+import org.apache.spark.sql.connector.catalog.Identifier;
+import org.apache.spark.sql.connector.catalog.StagingTableCatalog;
+import org.apache.spark.sql.connector.expressions.LogicalExpressions;
+import org.apache.spark.sql.connector.expressions.Transform;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Some;
+import scala.collection.JavaConverters;
+
+/**
+ * Takes a Delta Lake table and attempts to transform it into an Iceberg table in the same location with the same
+ * identifier. Once complete the identifier which previously referred to a non-Iceberg table will refer to the newly
+ * migrated Iceberg table.
+ */
+public class MigrateDeltaLakeTableSparkAction
+    implements MigrateDeltaLakeTable {
+
+  private static final Logger LOG = LoggerFactory.getLogger(MigrateDeltaLakeTableSparkAction.class);
+  private static final String BACKUP_SUFFIX = "_BACKUP_";
+
+  private final Map<String, String> additionalProperties = Maps.newHashMap();
+  private final SparkSession spark;
+  private final DeltaLog deltaLog;
+  private final StagingTableCatalog destCatalog;
+  private final String location;
+  private final Identifier identifier;
+  private final Identifier backupIdent;
+
+  MigrateDeltaLakeTableSparkAction(
+      SparkSession spark,
+      CatalogPlugin sourceCatalog,
+      Identifier sourceTableIdent
+  ) {
+    this.spark = spark;
+    this.destCatalog = checkDestinationCatalog(sourceCatalog);
+    this.identifier = sourceTableIdent;
+    this.backupIdent = Identifier.of(sourceTableIdent.namespace(), sourceTableIdent.name() + BACKUP_SUFFIX);
+    try {
+      CatalogTable tableMetadata = spark.sessionState().catalogManager().v1SessionCatalog()
+          .getTableMetadata(new TableIdentifier(sourceTableIdent.name()));
+      this.location = tableMetadata.location().getPath();
+      this.deltaLog = DeltaLog.forTable(spark.sessionState().newHadoopConf(), tableMetadata.location().getPath());
+    } catch (NoSuchTableException | NoSuchDatabaseException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public Result execute() {
+    // Rename the table
+    renameAndBackupSourceTable();
+
+    // Get a DeltaLog instance and retrieve the partitions (if applicable) of the table
+    io.delta.standalone.Snapshot updatedSnapshot = deltaLog.update();
+    List<SparkTableUtil.SparkPartition> partitions =
+        getSparkPartitionsFromDeltaSnapshot(updatedSnapshot, deltaLog.getPath());
+
+    StructType structType = getStructTypeFromDeltaSnapshot(updatedSnapshot);
+
+    Table icebergTable;
+    StagedSparkTable stagedTable = null;
+    boolean error = true;
+    try {
+      stagedTable = stageDestTable(
+              updatedSnapshot, location, destCatalog, identifier, structType, additionalProperties);
+      icebergTable = stagedTable.table();
+
+      PartitionSpec partitionSpec = getPartitionSpecFromDeltaSnapshot(updatedSnapshot, structType);
+      String stagingLocation = SparkTableUtil.getIcebergMetadataLocation(icebergTable);
+
+      SparkTableUtil.importSparkTable(
+              spark,
+              new TableIdentifier(backupIdent.name(), Some.apply(backupIdent.namespace()[0])),
+              icebergTable,
+              stagingLocation,
+              Collections.emptyMap(),
+              false,
+              partitionSpec,
+              partitions
+      );
+
+      stagedTable.commitStagedChanges();
+      error = false;
+    } finally {
+      if (error) {
+        LOG.error("Failed to perform the migration, aborting table creation and restoring the original table");
+        restoreSourceTable(destCatalog, backupIdent, identifier);
+        if (stagedTable != null) {
+          try {
+            stagedTable.abortStagedChanges();
+          } catch (Exception abortException) {
+            LOG.error("Cannot abort staged changes", abortException);
+          }
+        }
+      }
+    }
+    Snapshot snapshot = icebergTable.currentSnapshot();
+    long numFilesMigrated = Long.parseLong(snapshot.summary().get(SnapshotSummary.TOTAL_DATA_FILES_PROP));
+    LOG.info("Successfully loaded Iceberg metadata for {} files to {}", numFilesMigrated, identifier);
+    return new BaseMigrateDeltaLakeTableActionResult(numFilesMigrated);
+  }
+
+  private static void restoreSourceTable(StagingTableCatalog destinationCatalog,
+                                  Identifier backupIdent, Identifier sourceTableIdent) {
+    try {
+      destinationCatalog.renameTable(backupIdent, sourceTableIdent);
+    } catch (org.apache.spark.sql.catalyst.analysis.NoSuchTableException e) {
+      LOG.error("Cannot restore the original table, the backup table {} cannot be found", backupIdent, e);
+
+    } catch (org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException e) {
+      LOG.error("Cannot restore the original table, a table with the original name exists. " +
+              "Use the backup table {} to restore the original table manually.", backupIdent, e);
+    }
+  }
+
+  private static List<SparkTableUtil.SparkPartition> getSparkPartitionsFromDeltaSnapshot(
+      io.delta.standalone.Snapshot updatedSnapshot,
+      Path deltaLogPath
+  ) {
+    return updatedSnapshot.getAllFiles()
+        .stream()
+        // Map each partition to the list of files within it
+        .collect(Collectors.groupingBy(AddFile::getPartitionValues))
+        .entrySet()
+        .stream()
+        .map(entry -> {
+              // We don't care what value we take since they will all have the same prefix.
+              // The arbitrary file will have a path that looks like "partition1/partition2/file.parquet,
+              // We're interested in the part prior to the filename
+              AddFile addFile = entry.getValue().get(0);
+              String pathBeforeFileName = addFile.getPath().substring(0, addFile.getPath().lastIndexOf("/"));
+              String fullPath = new Path(deltaLogPath, pathBeforeFileName).toString();
+
+              return new SparkTableUtil.SparkPartition(
+                  entry.getKey(), // Map containing name and values of partitions
+                  fullPath,
+                  // Delta tables only support parquet
+                  "parquet"
+              );
+        }
+        )
+        .collect(Collectors.toList());
+  }
+
+  private static PartitionSpec getPartitionSpecFromDeltaSnapshot(
+      io.delta.standalone.Snapshot updatedSnapshot,
+      StructType structType
+  ) {
+    Type converted = SparkTypeVisitor.visit(structType, new SparkTypeToType(structType));
+    Schema schema = new Schema(converted.asNestedType().asStructType().fields());
+    PartitionSpec spec = SparkSchemaUtil.identitySpec(schema, updatedSnapshot.getMetadata().getPartitionColumns());
+    return spec == null ? PartitionSpec.unpartitioned() : spec;
+  }
+
+  private static StructType getStructTypeFromDeltaSnapshot(io.delta.standalone.Snapshot updatedSnapshot) {
+    io.delta.standalone.types.StructField[] fields =
+        Optional.ofNullable(updatedSnapshot.getMetadata().getSchema())
+            .map(io.delta.standalone.types.StructType::getFields)
+            .orElseThrow(() -> new RuntimeException("Cannot determine table schema!"));
+
+    // Convert from Delta StructFields to Spark StructFields
+    return new StructType(
+        Arrays.stream(fields)
+            .map(s -> new StructField(
+                    s.getName(),
+                    DataType.fromJson(s.getDataType().toJson()),
+                    s.isNullable(),
+                    Metadata.fromJson(s.getMetadata().toString())
+                )
+            )
+            .toArray(StructField[]::new)
+    );
+  }
+
+  private void renameAndBackupSourceTable() {
+    try {
+      LOG.info("Renaming {} as {} for backup", identifier, backupIdent);

Review Comment:
   "Renaming Delta Lake table ..."



##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java:
##########
@@ -393,16 +396,17 @@ public static void importSparkTable(SparkSession spark, TableIdentifier sourceTa
     }
 
     try {
-      PartitionSpec spec = SparkSchemaUtil.specForTable(spark, sourceTableIdentWithDB.unquotedString());
+      PartitionSpec spec = nullableSpec != null ? nullableSpec
+              : SparkSchemaUtil.specForTable(spark, sourceTableIdentWithDB.unquotedString());
 
       if (Objects.equal(spec, PartitionSpec.unpartitioned())) {
         importUnpartitionedSparkTable(spark, sourceTableIdentWithDB, targetTable, checkDuplicateFiles);
       } else {
-        List<SparkPartition> sourceTablePartitions = getPartitions(spark, sourceTableIdent,
-            partitionFilter);
-        Preconditions.checkArgument(!sourceTablePartitions.isEmpty(),
+        List<SparkPartition> partitions = nullablePartitions != null ? nullablePartitions
+                : getPartitions(spark, sourceTableIdent, partitionFilter);
+        Preconditions.checkArgument(!partitions.isEmpty(),
             "Cannot find any partitions in table %s", sourceTableIdent);
-        importSparkPartitions(spark, sourceTablePartitions, targetTable, spec, stagingDir, checkDuplicateFiles);
+        importSparkPartitions(spark, partitions, targetTable, nullableSpec, stagingDir, checkDuplicateFiles);

Review Comment:
   prefer to not change variable name if possible



##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java:
##########
@@ -616,6 +621,11 @@ public static Dataset<Row> loadMetadataTable(SparkSession spark, Table table, Me
     return Dataset.ofRows(spark, DataSourceV2Relation.create(metadataTable, Some.empty(), Some.empty(), options));
   }
 
+  public static String getIcebergMetadataLocation(Table table) {

Review Comment:
   this is hard coded across many places, I think we can raise a separated PR just to clean that up



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] jackye1995 commented on a diff in pull request #5331: WIP: Adding support for Delta to Iceberg migration

Posted by GitBox <gi...@apache.org>.
jackye1995 commented on code in PR #5331:
URL: https://github.com/apache/iceberg/pull/5331#discussion_r953979429


##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/MigrateDeltaLakeTableSparkAction.java:
##########
@@ -0,0 +1,383 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.actions;
+
+import io.delta.standalone.DeltaLog;
+import io.delta.standalone.VersionLog;
+import io.delta.standalone.actions.Action;
+import io.delta.standalone.actions.AddFile;
+import io.delta.standalone.actions.RemoveFile;
+import java.io.File;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
+import org.apache.hadoop.fs.Path;
+import org.apache.iceberg.AppendFiles;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.DeleteFiles;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Metrics;
+import org.apache.iceberg.MetricsConfig;
+import org.apache.iceberg.OverwriteFiles;
+import org.apache.iceberg.PartitionField;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.SnapshotSummary;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.actions.BaseMigrateDeltaLakeTableActionResult;
+import org.apache.iceberg.actions.MigrateDeltaLakeTable;
+import org.apache.iceberg.data.TableMigrationUtil;
+import org.apache.iceberg.exceptions.AlreadyExistsException;
+import org.apache.iceberg.exceptions.NoSuchNamespaceException;
+import org.apache.iceberg.mapping.NameMapping;
+import org.apache.iceberg.mapping.NameMappingParser;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.spark.SparkSchemaUtil;
+import org.apache.iceberg.spark.source.StagedSparkTable;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException;
+import org.apache.spark.sql.connector.catalog.CatalogPlugin;
+import org.apache.spark.sql.connector.catalog.Identifier;
+import org.apache.spark.sql.connector.catalog.StagingTableCatalog;
+import org.apache.spark.sql.connector.expressions.LogicalExpressions;
+import org.apache.spark.sql.connector.expressions.Transform;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import org.jetbrains.annotations.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.collection.JavaConverters;
+
+/**
+ * Takes a Delta Lake table and attempts to transform it into an Iceberg table in the same location
+ * with the same identifier. Once complete the identifier which previously referred to a non-Iceberg
+ * table will refer to the newly migrated Iceberg table.
+ */
+public class MigrateDeltaLakeTableSparkAction implements MigrateDeltaLakeTable {
+
+  private static final Logger LOG = LoggerFactory.getLogger(MigrateDeltaLakeTableSparkAction.class);
+
+  private final Map<String, String> additionalProperties = Maps.newHashMap();
+  private final SparkSession spark;
+  private final DeltaLog deltaLog;
+  private final StagingTableCatalog destCatalog;
+  private final String deltaTableLocation;
+  private final Identifier newIdentifier;
+
+  MigrateDeltaLakeTableSparkAction(
+      SparkSession spark,
+      CatalogPlugin destCatalog,
+      String deltaTableLocation,
+      Identifier newIdentifier) {
+    this.spark = spark;
+    this.destCatalog = checkDestinationCatalog(destCatalog);
+    this.newIdentifier = newIdentifier;
+    this.deltaTableLocation = deltaTableLocation;
+    this.deltaLog =
+        DeltaLog.forTable(spark.sessionState().newHadoopConf(), this.deltaTableLocation);
+  }
+
+  @Override
+  public Result execute() {
+    // Get a DeltaLog instance and retrieve the partitions (if applicable) of the table
+    io.delta.standalone.Snapshot updatedSnapshot = deltaLog.update();
+
+    StructType structType = getStructTypeFromDeltaSnapshot();
+
+    StagedSparkTable stagedTable =
+        stageDestTable(
+            updatedSnapshot,
+            deltaTableLocation,
+            destCatalog,
+            newIdentifier,
+            structType,
+            additionalProperties);
+    PartitionSpec partitionSpec = getPartitionSpecFromDeltaSnapshot(structType);
+
+    Table icebergTable = stagedTable.table();
+    copyFromDeltaLakeToIceberg(icebergTable, partitionSpec);
+
+    stagedTable.commitStagedChanges();
+    Snapshot snapshot = icebergTable.currentSnapshot();
+    long totalDataFiles =
+        Long.parseLong(snapshot.summary().get(SnapshotSummary.TOTAL_DATA_FILES_PROP));
+    LOG.info(
+        "Successfully loaded Iceberg metadata for {} files to {}",
+        totalDataFiles,
+        deltaTableLocation);
+    return new BaseMigrateDeltaLakeTableActionResult(totalDataFiles);
+  }
+
+  private void copyFromDeltaLakeToIceberg(Table table, PartitionSpec spec) {
+    // Get all changes starting from version 0
+    Iterator<VersionLog> it = deltaLog.getChanges(0, false);
+
+    while (it.hasNext()) {
+      VersionLog versionLog = it.next();
+      List<Action> actions = versionLog.getActions();
+
+      // We first need to iterate through to see what kind of transaction this was. There are 3
+      // cases:
+      // 1. AppendFile - when there are only AddFile instances (an INSERT on the table)
+      // 2. DeleteFiles - when there are only RemoveFile instances (a DELETE where all the records
+      // of file(s) were removed
+      // 3. OverwriteFiles - when there are a mix of AddFile and RemoveFile (a DELETE/UPDATE)
+
+      // Create a map of Delta Lake Action (AddFile, RemoveFile, etc.) --> List<Action>
+      Map<String, List<Action>> deltaLakeActionsMap =
+          actions.stream()
+              .filter(action -> action instanceof AddFile || action instanceof RemoveFile)
+              .collect(Collectors.groupingBy(a -> a.getClass().getSimpleName()));
+      // Scan the map so that we know what type of transaction this will be in Iceberg
+      IcebergTransactionType icebergTransactionType =
+          getIcebergTransactionTypeFromDeltaActions(deltaLakeActionsMap);
+      if (icebergTransactionType == null) {
+        return;
+      }
+
+      List<DataFile> filesToAdd = Lists.newArrayList();
+      List<DataFile> filesToRemove = Lists.newArrayList();
+      for (Action action : Iterables.concat(deltaLakeActionsMap.values())) {
+        DataFile dataFile = buildDataFileForAction(action, table, spec);
+        if (action instanceof AddFile) {
+          filesToAdd.add(dataFile);
+        } else {
+          // We would have thrown an exception above if it wasn't a RemoveFile
+          filesToRemove.add(dataFile);
+        }
+      }
+
+      switch (icebergTransactionType) {
+        case APPEND_FILES:
+          AppendFiles appendFiles = table.newAppend();
+          filesToAdd.forEach(appendFiles::appendFile);
+          appendFiles.commit();
+          break;
+        case DELETE_FILES:
+          DeleteFiles deleteFiles = table.newDelete();
+          filesToRemove.forEach(deleteFiles::deleteFile);
+          deleteFiles.commit();
+          break;
+        case OVERWRITE_FILES:
+          OverwriteFiles overwriteFiles = table.newOverwrite();
+          filesToAdd.forEach(overwriteFiles::addFile);
+          filesToRemove.forEach(overwriteFiles::deleteFile);
+          overwriteFiles.commit();
+          break;
+      }
+    }
+  }
+
+  private DataFile buildDataFileForAction(Action action, Table table, PartitionSpec spec) {
+    String path;
+    long size;
+    Map<String, String> partitionValues;
+
+    if (action instanceof AddFile) {
+      AddFile addFile = (AddFile) action;
+      path = addFile.getPath();
+      size = addFile.getSize();
+      partitionValues = addFile.getPartitionValues();
+    } else if (action instanceof RemoveFile) {
+      RemoveFile removeFile = (RemoveFile) action;
+      path = removeFile.getPath();
+      size =
+          removeFile
+              .getSize()
+              .orElseThrow(
+                  () ->
+                      new RuntimeException(
+                          String.format("File %s removed with specifying a size", path)));
+      partitionValues =
+          Optional.ofNullable(removeFile.getPartitionValues())
+              .orElseThrow(
+                  () ->
+                      new RuntimeException(
+                          String.format(
+                              "File %s removed without specifying partition values", path)));
+    } else {
+      throw new IllegalStateException(
+          String.format(
+              "Unexpected action type for Delta Lake: %s", action.getClass().getSimpleName()));
+    }
+
+    String fullFilePath = deltaLog.getPath().toString() + File.separator + path;
+    Metrics metrics = getMetricsForFile(table, fullFilePath);
+
+    String partition =
+        spec.fields().stream()
+            .map(PartitionField::name)
+            .map(name -> String.format("%s=%s", name, partitionValues.get(name)))
+            .collect(Collectors.joining("/"));
+
+    return DataFiles.builder(spec)
+        .withPath(fullFilePath)
+        .withFormat(FileFormat.PARQUET)
+        .withFileSizeInBytes(size)
+        .withMetrics(metrics)
+        .withPartitionPath(partition)
+        .withRecordCount(metrics.recordCount())
+        .build();
+  }
+
+  private Metrics getMetricsForFile(Table table, String fullFilePath) {
+    MetricsConfig metricsConfig = MetricsConfig.forTable(table);
+    String nameMappingString = table.properties().get(TableProperties.DEFAULT_NAME_MAPPING);
+    NameMapping nameMapping =
+        nameMappingString != null ? NameMappingParser.fromJson(nameMappingString) : null;
+    return TableMigrationUtil.getParquetMetrics(

Review Comment:
   we should do a check to make sure the file is indeed Parquet



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] ericlgoodman commented on pull request #5331: WIP: Adding support for Delta to Iceberg migration

Posted by GitBox <gi...@apache.org>.
ericlgoodman commented on PR #5331:
URL: https://github.com/apache/iceberg/pull/5331#issuecomment-1329331585

   > Hi @ericlgoodman. My name is Rushan Jiang, a CS undergrad at CMU. I am interested in learning and contributing to this migration support. I saw you did not update this PR for some time. Would you mind allowing me to continue your work?
   > 
   > I appreciate your time and consideration.
   
   Followed up with you on the Iceberg Slack.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] amogh-jahagirdar commented on a diff in pull request #5331: WIP: Adding support for Delta to Iceberg migration

Posted by GitBox <gi...@apache.org>.
amogh-jahagirdar commented on code in PR #5331:
URL: https://github.com/apache/iceberg/pull/5331#discussion_r931828413


##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkSchemaUtil.java:
##########
@@ -293,7 +293,7 @@ private static PartitionSpec identitySpec(Schema schema, Collection<Column> colu
     return identitySpec(schema, names);
   }
 
-  private static PartitionSpec identitySpec(Schema schema, List<String> partitionNames) {
+  public static PartitionSpec identitySpec(Schema schema, List<String> partitionNames) {

Review Comment:
   Why is the change in access modifiers?



##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/MigrateDeltaLakeTableSparkAction.java:
##########
@@ -0,0 +1,311 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.actions;
+
+import io.delta.standalone.DeltaLog;
+import io.delta.standalone.actions.AddFile;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
+import org.apache.hadoop.fs.Path;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.SnapshotSummary;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.actions.BaseMigrateDeltaLakeTableActionResult;
+import org.apache.iceberg.actions.MigrateDeltaLakeTable;
+import org.apache.iceberg.exceptions.AlreadyExistsException;
+import org.apache.iceberg.exceptions.NoSuchNamespaceException;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.spark.SparkSchemaUtil;
+import org.apache.iceberg.spark.SparkTableUtil;
+import org.apache.iceberg.spark.SparkTypeToType;
+import org.apache.iceberg.spark.SparkTypeVisitor;
+import org.apache.iceberg.spark.source.StagedSparkTable;
+import org.apache.iceberg.types.Type;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.catalyst.TableIdentifier;
+import org.apache.spark.sql.catalyst.analysis.NoSuchDatabaseException;
+import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
+import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException;
+import org.apache.spark.sql.catalyst.catalog.CatalogTable;
+import org.apache.spark.sql.connector.catalog.CatalogPlugin;
+import org.apache.spark.sql.connector.catalog.Identifier;
+import org.apache.spark.sql.connector.catalog.StagingTableCatalog;
+import org.apache.spark.sql.connector.expressions.LogicalExpressions;
+import org.apache.spark.sql.connector.expressions.Transform;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Some;
+import scala.collection.JavaConverters;
+
+/**
+ * Takes a Delta Lake table and attempts to transform it into an Iceberg table in the same location with the same
+ * identifier. Once complete the identifier which previously referred to a non-Iceberg table will refer to the newly
+ * migrated Iceberg table.
+ */
+public class MigrateDeltaLakeTableSparkAction
+    implements MigrateDeltaLakeTable {
+
+  private static final Logger LOG = LoggerFactory.getLogger(MigrateDeltaLakeTableSparkAction.class);
+  private static final String BACKUP_SUFFIX = "_BACKUP_";
+
+  private final Map<String, String> additionalProperties = Maps.newHashMap();
+  private final SparkSession spark;
+  private final DeltaLog deltaLog;
+  private final StagingTableCatalog destCatalog;
+  private final String location;
+  private final Identifier identifier;
+  private final Identifier backupIdent;
+
+  MigrateDeltaLakeTableSparkAction(
+      SparkSession spark,
+      CatalogPlugin sourceCatalog,
+      Identifier sourceTableIdent
+  ) {
+    this.spark = spark;
+    this.destCatalog = checkDestinationCatalog(sourceCatalog);
+    this.identifier = sourceTableIdent;
+    this.backupIdent = Identifier.of(sourceTableIdent.namespace(), sourceTableIdent.name() + BACKUP_SUFFIX);
+    try {
+      CatalogTable tableMetadata = spark.sessionState().catalogManager().v1SessionCatalog()
+          .getTableMetadata(new TableIdentifier(sourceTableIdent.name()));
+      this.location = tableMetadata.location().getPath();
+      this.deltaLog = DeltaLog.forTable(spark.sessionState().newHadoopConf(), tableMetadata.location().getPath());
+    } catch (NoSuchTableException | NoSuchDatabaseException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public Result execute() {
+    // Rename the table
+    renameAndBackupSourceTable();
+
+    // Get a DeltaLog instance and retrieve the partitions (if applicable) of the table
+    io.delta.standalone.Snapshot updatedSnapshot = deltaLog.update();
+    List<SparkTableUtil.SparkPartition> partitions =
+        getSparkPartitionsFromDeltaSnapshot(updatedSnapshot, deltaLog.getPath());
+
+    StructType structType = getStructTypeFromDeltaSnapshot(updatedSnapshot);
+
+    Table icebergTable;
+    StagedSparkTable stagedTable = null;
+    boolean error = true;
+    try {
+      stagedTable = stageDestTable(
+              updatedSnapshot, location, destCatalog, identifier, structType, additionalProperties);
+      icebergTable = stagedTable.table();
+
+      PartitionSpec partitionSpec = getPartitionSpecFromDeltaSnapshot(updatedSnapshot, structType);
+      String stagingLocation = SparkTableUtil.getIcebergMetadataLocation(icebergTable);
+
+      SparkTableUtil.importSparkTable(
+              spark,
+              new TableIdentifier(backupIdent.name(), Some.apply(backupIdent.namespace()[0])),
+              icebergTable,
+              stagingLocation,
+              Collections.emptyMap(),
+              false,
+              partitionSpec,
+              partitions
+      );
+
+      stagedTable.commitStagedChanges();
+      error = false;
+    } finally {
+      if (error) {
+        LOG.error("Failed to perform the migration, aborting table creation and restoring the original table");
+        restoreSourceTable(destCatalog, backupIdent, identifier);
+        if (stagedTable != null) {
+          try {
+            stagedTable.abortStagedChanges();
+          } catch (Exception abortException) {
+            LOG.error("Cannot abort staged changes", abortException);
+          }
+        }
+      }
+    }
+    Snapshot snapshot = icebergTable.currentSnapshot();
+    long numFilesMigrated = Long.parseLong(snapshot.summary().get(SnapshotSummary.TOTAL_DATA_FILES_PROP));
+    LOG.info("Successfully loaded Iceberg metadata for {} files to {}", numFilesMigrated, identifier);
+    return new BaseMigrateDeltaLakeTableActionResult(numFilesMigrated);
+  }
+
+  private static void restoreSourceTable(StagingTableCatalog destinationCatalog,
+                                  Identifier backupIdent, Identifier sourceTableIdent) {
+    try {
+      destinationCatalog.renameTable(backupIdent, sourceTableIdent);
+    } catch (org.apache.spark.sql.catalyst.analysis.NoSuchTableException e) {
+      LOG.error("Cannot restore the original table, the backup table {} cannot be found", backupIdent, e);
+
+    } catch (org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException e) {
+      LOG.error("Cannot restore the original table, a table with the original name exists. " +
+              "Use the backup table {} to restore the original table manually.", backupIdent, e);
+    }
+  }
+
+  private static List<SparkTableUtil.SparkPartition> getSparkPartitionsFromDeltaSnapshot(
+      io.delta.standalone.Snapshot updatedSnapshot,
+      Path deltaLogPath
+  ) {
+    return updatedSnapshot.getAllFiles()
+        .stream()
+        // Map each partition to the list of files within it
+        .collect(Collectors.groupingBy(AddFile::getPartitionValues))
+        .entrySet()
+        .stream()
+        .map(entry -> {
+              // We don't care what value we take since they will all have the same prefix.
+              // The arbitrary file will have a path that looks like "partition1/partition2/file.parquet,
+              // We're interested in the part prior to the filename
+              AddFile addFile = entry.getValue().get(0);
+              String pathBeforeFileName = addFile.getPath().substring(0, addFile.getPath().lastIndexOf("/"));
+              String fullPath = new Path(deltaLogPath, pathBeforeFileName).toString();
+
+              return new SparkTableUtil.SparkPartition(
+                  entry.getKey(), // Map containing name and values of partitions
+                  fullPath,
+                  // Delta tables only support parquet
+                  "parquet"
+              );
+        }
+        )
+        .collect(Collectors.toList());
+  }
+
+  private static PartitionSpec getPartitionSpecFromDeltaSnapshot(

Review Comment:
   I get what the function is doing but does it need to be static?



##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/MigrateDeltaLakeTableSparkAction.java:
##########
@@ -0,0 +1,311 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.actions;
+
+import io.delta.standalone.DeltaLog;
+import io.delta.standalone.actions.AddFile;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
+import org.apache.hadoop.fs.Path;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.SnapshotSummary;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.actions.BaseMigrateDeltaLakeTableActionResult;
+import org.apache.iceberg.actions.MigrateDeltaLakeTable;
+import org.apache.iceberg.exceptions.AlreadyExistsException;
+import org.apache.iceberg.exceptions.NoSuchNamespaceException;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.spark.SparkSchemaUtil;
+import org.apache.iceberg.spark.SparkTableUtil;
+import org.apache.iceberg.spark.SparkTypeToType;
+import org.apache.iceberg.spark.SparkTypeVisitor;
+import org.apache.iceberg.spark.source.StagedSparkTable;
+import org.apache.iceberg.types.Type;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.catalyst.TableIdentifier;
+import org.apache.spark.sql.catalyst.analysis.NoSuchDatabaseException;
+import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
+import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException;
+import org.apache.spark.sql.catalyst.catalog.CatalogTable;
+import org.apache.spark.sql.connector.catalog.CatalogPlugin;
+import org.apache.spark.sql.connector.catalog.Identifier;
+import org.apache.spark.sql.connector.catalog.StagingTableCatalog;
+import org.apache.spark.sql.connector.expressions.LogicalExpressions;
+import org.apache.spark.sql.connector.expressions.Transform;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Some;
+import scala.collection.JavaConverters;
+
+/**
+ * Takes a Delta Lake table and attempts to transform it into an Iceberg table in the same location with the same
+ * identifier. Once complete the identifier which previously referred to a non-Iceberg table will refer to the newly
+ * migrated Iceberg table.
+ */
+public class MigrateDeltaLakeTableSparkAction
+    implements MigrateDeltaLakeTable {
+
+  private static final Logger LOG = LoggerFactory.getLogger(MigrateDeltaLakeTableSparkAction.class);
+  private static final String BACKUP_SUFFIX = "_BACKUP_";
+
+  private final Map<String, String> additionalProperties = Maps.newHashMap();
+  private final SparkSession spark;
+  private final DeltaLog deltaLog;
+  private final StagingTableCatalog destCatalog;
+  private final String location;
+  private final Identifier identifier;
+  private final Identifier backupIdent;
+
+  MigrateDeltaLakeTableSparkAction(
+      SparkSession spark,
+      CatalogPlugin sourceCatalog,
+      Identifier sourceTableIdent
+  ) {
+    this.spark = spark;
+    this.destCatalog = checkDestinationCatalog(sourceCatalog);
+    this.identifier = sourceTableIdent;
+    this.backupIdent = Identifier.of(sourceTableIdent.namespace(), sourceTableIdent.name() + BACKUP_SUFFIX);
+    try {
+      CatalogTable tableMetadata = spark.sessionState().catalogManager().v1SessionCatalog()
+          .getTableMetadata(new TableIdentifier(sourceTableIdent.name()));
+      this.location = tableMetadata.location().getPath();
+      this.deltaLog = DeltaLog.forTable(spark.sessionState().newHadoopConf(), tableMetadata.location().getPath());
+    } catch (NoSuchTableException | NoSuchDatabaseException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public Result execute() {
+    // Rename the table
+    renameAndBackupSourceTable();
+
+    // Get a DeltaLog instance and retrieve the partitions (if applicable) of the table
+    io.delta.standalone.Snapshot updatedSnapshot = deltaLog.update();
+    List<SparkTableUtil.SparkPartition> partitions =
+        getSparkPartitionsFromDeltaSnapshot(updatedSnapshot, deltaLog.getPath());
+
+    StructType structType = getStructTypeFromDeltaSnapshot(updatedSnapshot);
+
+    Table icebergTable;
+    StagedSparkTable stagedTable = null;
+    boolean error = true;
+    try {
+      stagedTable = stageDestTable(
+              updatedSnapshot, location, destCatalog, identifier, structType, additionalProperties);
+      icebergTable = stagedTable.table();
+
+      PartitionSpec partitionSpec = getPartitionSpecFromDeltaSnapshot(updatedSnapshot, structType);
+      String stagingLocation = SparkTableUtil.getIcebergMetadataLocation(icebergTable);
+
+      SparkTableUtil.importSparkTable(
+              spark,
+              new TableIdentifier(backupIdent.name(), Some.apply(backupIdent.namespace()[0])),
+              icebergTable,
+              stagingLocation,
+              Collections.emptyMap(),
+              false,
+              partitionSpec,
+              partitions
+      );
+
+      stagedTable.commitStagedChanges();
+      error = false;
+    } finally {
+      if (error) {
+        LOG.error("Failed to perform the migration, aborting table creation and restoring the original table");
+        restoreSourceTable(destCatalog, backupIdent, identifier);
+        if (stagedTable != null) {
+          try {
+            stagedTable.abortStagedChanges();
+          } catch (Exception abortException) {
+            LOG.error("Cannot abort staged changes", abortException);
+          }
+        }
+      }
+    }
+    Snapshot snapshot = icebergTable.currentSnapshot();
+    long numFilesMigrated = Long.parseLong(snapshot.summary().get(SnapshotSummary.TOTAL_DATA_FILES_PROP));
+    LOG.info("Successfully loaded Iceberg metadata for {} files to {}", numFilesMigrated, identifier);
+    return new BaseMigrateDeltaLakeTableActionResult(numFilesMigrated);
+  }
+
+  private static void restoreSourceTable(StagingTableCatalog destinationCatalog,
+                                  Identifier backupIdent, Identifier sourceTableIdent) {
+    try {
+      destinationCatalog.renameTable(backupIdent, sourceTableIdent);
+    } catch (org.apache.spark.sql.catalyst.analysis.NoSuchTableException e) {
+      LOG.error("Cannot restore the original table, the backup table {} cannot be found", backupIdent, e);
+
+    } catch (org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException e) {
+      LOG.error("Cannot restore the original table, a table with the original name exists. " +
+              "Use the backup table {} to restore the original table manually.", backupIdent, e);
+    }
+  }
+
+  private static List<SparkTableUtil.SparkPartition> getSparkPartitionsFromDeltaSnapshot(

Review Comment:
   same, does it need to be static?



##########
core/src/main/java/org/apache/iceberg/actions/BaseMigrateDeltaLakeTableActionResult.java:
##########
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.actions;
+
+public class BaseMigrateDeltaLakeTableActionResult implements MigrateDeltaLakeTable.Result {
+
+  private final long numFilesImported;
+
+  public BaseMigrateDeltaLakeTableActionResult(long numFilesImported) {

Review Comment:
   Would it make sense to capture aspects like the total data size and how long the migration procedure took in the result?



##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/MigrateDeltaLakeTableSparkAction.java:
##########
@@ -0,0 +1,311 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.actions;
+
+import io.delta.standalone.DeltaLog;
+import io.delta.standalone.actions.AddFile;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
+import org.apache.hadoop.fs.Path;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.SnapshotSummary;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.actions.BaseMigrateDeltaLakeTableActionResult;
+import org.apache.iceberg.actions.MigrateDeltaLakeTable;
+import org.apache.iceberg.exceptions.AlreadyExistsException;
+import org.apache.iceberg.exceptions.NoSuchNamespaceException;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.spark.SparkSchemaUtil;
+import org.apache.iceberg.spark.SparkTableUtil;
+import org.apache.iceberg.spark.SparkTypeToType;
+import org.apache.iceberg.spark.SparkTypeVisitor;
+import org.apache.iceberg.spark.source.StagedSparkTable;
+import org.apache.iceberg.types.Type;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.catalyst.TableIdentifier;
+import org.apache.spark.sql.catalyst.analysis.NoSuchDatabaseException;
+import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
+import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException;
+import org.apache.spark.sql.catalyst.catalog.CatalogTable;
+import org.apache.spark.sql.connector.catalog.CatalogPlugin;
+import org.apache.spark.sql.connector.catalog.Identifier;
+import org.apache.spark.sql.connector.catalog.StagingTableCatalog;
+import org.apache.spark.sql.connector.expressions.LogicalExpressions;
+import org.apache.spark.sql.connector.expressions.Transform;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Some;
+import scala.collection.JavaConverters;
+
+/**
+ * Takes a Delta Lake table and attempts to transform it into an Iceberg table in the same location with the same
+ * identifier. Once complete the identifier which previously referred to a non-Iceberg table will refer to the newly
+ * migrated Iceberg table.
+ */
+public class MigrateDeltaLakeTableSparkAction
+    implements MigrateDeltaLakeTable {
+
+  private static final Logger LOG = LoggerFactory.getLogger(MigrateDeltaLakeTableSparkAction.class);
+  private static final String BACKUP_SUFFIX = "_BACKUP_";
+
+  private final Map<String, String> additionalProperties = Maps.newHashMap();
+  private final SparkSession spark;
+  private final DeltaLog deltaLog;
+  private final StagingTableCatalog destCatalog;
+  private final String location;
+  private final Identifier identifier;
+  private final Identifier backupIdent;
+
+  MigrateDeltaLakeTableSparkAction(
+      SparkSession spark,
+      CatalogPlugin sourceCatalog,
+      Identifier sourceTableIdent
+  ) {
+    this.spark = spark;
+    this.destCatalog = checkDestinationCatalog(sourceCatalog);
+    this.identifier = sourceTableIdent;
+    this.backupIdent = Identifier.of(sourceTableIdent.namespace(), sourceTableIdent.name() + BACKUP_SUFFIX);
+    try {
+      CatalogTable tableMetadata = spark.sessionState().catalogManager().v1SessionCatalog()
+          .getTableMetadata(new TableIdentifier(sourceTableIdent.name()));
+      this.location = tableMetadata.location().getPath();
+      this.deltaLog = DeltaLog.forTable(spark.sessionState().newHadoopConf(), tableMetadata.location().getPath());
+    } catch (NoSuchTableException | NoSuchDatabaseException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public Result execute() {
+    // Rename the table
+    renameAndBackupSourceTable();
+
+    // Get a DeltaLog instance and retrieve the partitions (if applicable) of the table
+    io.delta.standalone.Snapshot updatedSnapshot = deltaLog.update();
+    List<SparkTableUtil.SparkPartition> partitions =
+        getSparkPartitionsFromDeltaSnapshot(updatedSnapshot, deltaLog.getPath());
+
+    StructType structType = getStructTypeFromDeltaSnapshot(updatedSnapshot);
+
+    Table icebergTable;
+    StagedSparkTable stagedTable = null;
+    boolean error = true;
+    try {
+      stagedTable = stageDestTable(
+              updatedSnapshot, location, destCatalog, identifier, structType, additionalProperties);
+      icebergTable = stagedTable.table();
+
+      PartitionSpec partitionSpec = getPartitionSpecFromDeltaSnapshot(updatedSnapshot, structType);
+      String stagingLocation = SparkTableUtil.getIcebergMetadataLocation(icebergTable);
+
+      SparkTableUtil.importSparkTable(
+              spark,
+              new TableIdentifier(backupIdent.name(), Some.apply(backupIdent.namespace()[0])),
+              icebergTable,
+              stagingLocation,
+              Collections.emptyMap(),
+              false,
+              partitionSpec,
+              partitions
+      );
+
+      stagedTable.commitStagedChanges();
+      error = false;
+    } finally {
+      if (error) {
+        LOG.error("Failed to perform the migration, aborting table creation and restoring the original table");
+        restoreSourceTable(destCatalog, backupIdent, identifier);
+        if (stagedTable != null) {
+          try {
+            stagedTable.abortStagedChanges();
+          } catch (Exception abortException) {
+            LOG.error("Cannot abort staged changes", abortException);
+          }
+        }
+      }
+    }
+    Snapshot snapshot = icebergTable.currentSnapshot();
+    long numFilesMigrated = Long.parseLong(snapshot.summary().get(SnapshotSummary.TOTAL_DATA_FILES_PROP));
+    LOG.info("Successfully loaded Iceberg metadata for {} files to {}", numFilesMigrated, identifier);
+    return new BaseMigrateDeltaLakeTableActionResult(numFilesMigrated);
+  }
+
+  private static void restoreSourceTable(StagingTableCatalog destinationCatalog,
+                                  Identifier backupIdent, Identifier sourceTableIdent) {
+    try {
+      destinationCatalog.renameTable(backupIdent, sourceTableIdent);
+    } catch (org.apache.spark.sql.catalyst.analysis.NoSuchTableException e) {
+      LOG.error("Cannot restore the original table, the backup table {} cannot be found", backupIdent, e);
+
+    } catch (org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException e) {
+      LOG.error("Cannot restore the original table, a table with the original name exists. " +
+              "Use the backup table {} to restore the original table manually.", backupIdent, e);
+    }
+  }
+
+  private static List<SparkTableUtil.SparkPartition> getSparkPartitionsFromDeltaSnapshot(
+      io.delta.standalone.Snapshot updatedSnapshot,
+      Path deltaLogPath
+  ) {
+    return updatedSnapshot.getAllFiles()
+        .stream()
+        // Map each partition to the list of files within it
+        .collect(Collectors.groupingBy(AddFile::getPartitionValues))
+        .entrySet()
+        .stream()
+        .map(entry -> {
+              // We don't care what value we take since they will all have the same prefix.
+              // The arbitrary file will have a path that looks like "partition1/partition2/file.parquet,
+              // We're interested in the part prior to the filename
+              AddFile addFile = entry.getValue().get(0);
+              String pathBeforeFileName = addFile.getPath().substring(0, addFile.getPath().lastIndexOf("/"));
+              String fullPath = new Path(deltaLogPath, pathBeforeFileName).toString();
+
+              return new SparkTableUtil.SparkPartition(
+                  entry.getKey(), // Map containing name and values of partitions
+                  fullPath,
+                  // Delta tables only support parquet
+                  "parquet"
+              );
+        }
+        )
+        .collect(Collectors.toList());
+  }

Review Comment:
   Should we parallelize this computation? For example, concurrently across files update a grouping based on partition values. Could be overkill (depends on the number of files in the snapshot)



##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java:
##########
@@ -380,7 +381,9 @@ private static Iterator<ManifestFile> buildManifest(SerializableConfiguration co
    */
   public static void importSparkTable(SparkSession spark, TableIdentifier sourceTableIdent, Table targetTable,
                                       String stagingDir, Map<String, String> partitionFilter,
-                                      boolean checkDuplicateFiles) {
+                                      boolean checkDuplicateFiles,
+                                      PartitionSpec nullableSpec,
+                                      List<SparkPartition> nullablePartitions) {

Review Comment:
   I get we need to pass in the computed partition spec and spark partitions, but don't think we should change any public API signatures, we can add a new one.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org