You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2021/04/07 22:33:26 UTC

[GitHub] [iceberg] aokolnychyi opened a new pull request #2437: Spark: Refactor snapshot and migrate actions

aokolnychyi opened a new pull request #2437:
URL: https://github.com/apache/iceberg/pull/2437


   This PR moves our snapshot and migrate actions to use the new API.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #2437: Spark: Refactor snapshot and migrate actions

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #2437:
URL: https://github.com/apache/iceberg/pull/2437#discussion_r609113702



##########
File path: spark3/src/main/java/org/apache/iceberg/spark/actions/BaseTableMigrationSparkAction.java
##########
@@ -191,16 +182,4 @@ protected String getMetadataLocation(Table table) {
   protected abstract Map<String, String> targetTableProps();
 
   protected abstract TableCatalog checkSourceCatalog(CatalogPlugin catalog);
-
-  protected <T> T withJobGroupInfo(JobGroupInfo info, Supplier<T> supplier) {

Review comment:
       Consume from `BaseSparkAction` now.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on pull request #2437: Spark: Refactor snapshot and migrate actions

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on pull request #2437:
URL: https://github.com/apache/iceberg/pull/2437#issuecomment-816466466


   Hmm, I think you analysis is right, @pvary. It looks like we are hitting timeouts, though. The latest CI job on this PR, for example, failed with the timeout of 360 minutes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on a change in pull request #2437: Spark: Refactor snapshot and migrate actions

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on a change in pull request #2437:
URL: https://github.com/apache/iceberg/pull/2437#discussion_r610897117



##########
File path: spark3/src/main/java/org/apache/iceberg/spark/actions/BaseTableMigrationSparkAction.java
##########
@@ -99,20 +93,17 @@
     this.sourceTableLocation = CatalogUtils.URIToString(sourceCatalogTable.storage().locationUri().get());
   }
 
-  @Override
-  public CreateAction withProperties(Map<String, String> properties) {
-    this.additionalProperties.putAll(properties);
-    return this;
+  protected void setDestCatalogAndIdent(CatalogPlugin catalog, Identifier ident) {

Review comment:
       I think this was needed before since Snapshot and Migrate descended from CreateTable Actions, now I don't think you need dest catalog at all




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on pull request #2437: Spark: Refactor snapshot and migrate actions

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on pull request #2437:
URL: https://github.com/apache/iceberg/pull/2437#issuecomment-815997140


   I see more and more Tez-related failures on recent PRs.
   
   ```
   org.apache.iceberg.mr.hive.TestHiveIcebergStorageHandlerWithEngine > testCBOWithSelectedColumnsOverlapJoin[fileFormat=AVRO, engine=tez, catalog=HIVE_CATALOG] FAILED
       java.lang.IllegalArgumentException: Failed to execute Hive query 'SELECT c.first_name, o.order_id FROM default.orders o JOIN default.customers c ON o.customer_id = c.customer_id ORDER BY o.order_id DESC': Error while processing statement: FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.tez.TezTask
           at org.apache.iceberg.mr.hive.TestHiveShell.executeStatement(TestHiveShell.java:151)
           at org.apache.iceberg.mr.hive.TestHiveIcebergStorageHandlerWithEngine.testCBOWithSelectedColumnsOverlapJoin(TestHiveIcebergStorageHandlerWithEngine.java:217)
   ```
   
   cc @pvary @marton-bod 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on pull request #2437: Spark: Refactor snapshot and migrate actions

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on pull request #2437:
URL: https://github.com/apache/iceberg/pull/2437#issuecomment-818414215


   Thanks for reviewing, @RussellSpitzer!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on pull request #2437: Spark: Refactor snapshot and migrate actions

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on pull request #2437:
URL: https://github.com/apache/iceberg/pull/2437#issuecomment-815306356


   cc @RussellSpitzer 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on pull request #2437: Spark: Refactor snapshot and migrate actions

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on pull request #2437:
URL: https://github.com/apache/iceberg/pull/2437#issuecomment-816467708


   Looks like the job I pointed to also failed with a timeout.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] pvary commented on pull request #2437: Spark: Refactor snapshot and migrate actions

Posted by GitBox <gi...@apache.org>.
pvary commented on pull request #2437:
URL: https://github.com/apache/iceberg/pull/2437#issuecomment-816474659


   > Looks like the job I pointed to also failed with a timeout.
   
   Maybe this is related to resource problems when trying to create the Tez session?
   
   When we are creating a new TezAM we ask a new container from YARN. Maybe the issue is that we do not get the new Yarn container (because of the missing resources) and we waiting until the timeout is reached. It might be a good idea to add `@Timeout` for the tests


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi merged pull request #2437: Spark: Refactor snapshot and migrate actions

Posted by GitBox <gi...@apache.org>.
aokolnychyi merged pull request #2437:
URL: https://github.com/apache/iceberg/pull/2437


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #2437: Spark: Refactor snapshot and migrate actions

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #2437:
URL: https://github.com/apache/iceberg/pull/2437#discussion_r610882145



##########
File path: spark3/src/main/java/org/apache/iceberg/spark/actions/BaseSnapshotTableSparkAction.java
##########
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.actions;
+
+import java.util.Map;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.SnapshotSummary;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.actions.BaseSnapshotTableActionResult;
+import org.apache.iceberg.actions.SnapshotTable;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.spark.JobGroupInfo;
+import org.apache.iceberg.spark.Spark3Util;
+import org.apache.iceberg.spark.Spark3Util.CatalogAndIdentifier;
+import org.apache.iceberg.spark.SparkTableUtil;
+import org.apache.iceberg.spark.source.StagedSparkTable;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.catalyst.TableIdentifier;
+import org.apache.spark.sql.connector.catalog.CatalogPlugin;
+import org.apache.spark.sql.connector.catalog.Identifier;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.collection.JavaConverters;
+
+/**
+ * Creates a new Iceberg table based on a source Spark table. The new Iceberg table will
+ * have a different data and metadata directory allowing it to exist independently of the
+ * source table.
+ */
+public class BaseSnapshotTableSparkAction
+    extends BaseTableMigrationSparkAction<SnapshotTable, SnapshotTable.Result>
+    implements SnapshotTable {
+
+  private static final Logger LOG = LoggerFactory.getLogger(BaseSnapshotTableSparkAction.class);
+
+  private String destTableLocation = null;
+
+  public BaseSnapshotTableSparkAction(SparkSession spark, CatalogPlugin sourceCatalog, Identifier sourceTableIdent,
+                                      CatalogPlugin destCatalog, Identifier destTableIdent) {
+    super(spark, sourceCatalog, sourceTableIdent, destCatalog, destTableIdent);
+  }
+
+  @Override
+  protected SnapshotTable self() {
+    return this;
+  }
+
+  @Override
+  public SnapshotTable as(String ident) {
+    String ctx = "snapshot destination";
+    CatalogPlugin defaultCatalog = spark().sessionState().catalogManager().currentCatalog();
+    CatalogAndIdentifier catalogAndIdent = Spark3Util.catalogAndIdentifier(ctx, spark(), ident, defaultCatalog);
+    setDestCatalogAndIdent(catalogAndIdent.catalog(), catalogAndIdent.identifier());
+    return this;
+  }
+
+  @Override
+  public SnapshotTable tableProperties(Map<String, String> properties) {
+    setProperties(properties);
+    return this;
+  }
+
+  @Override
+  public SnapshotTable tableProperty(String property, String value) {
+    setProperty(property, value);
+    return this;
+  }
+
+  @Override
+  public SnapshotTable.Result execute() {
+    JobGroupInfo info = newJobGroupInfo("SNAPSHOT-TABLE", "SNAPSHOT-TABLE");
+    return withJobGroupInfo(info, this::doExecute);
+  }
+
+  private SnapshotTable.Result doExecute() {
+    Preconditions.checkArgument(destCatalog() != null && destTableIdent() != null,
+        "The destination catalog and identifier cannot be null. " +
+        "Make sure to configure the action with a valid destination table identifier via the `as` method.");
+
+    LOG.info("Staging a new Iceberg table {} as a snapshot of {}", destTableIdent(), sourceTableIdent());
+    StagedSparkTable stagedTable = stageDestTable();
+    Table icebergTable = stagedTable.table();
+
+    // TODO: Check the dest table location does not overlap with the source table location
+
+    boolean threw = true;
+    try {
+      LOG.info("Ensuring {} has a valid name mapping", destTableIdent());
+      ensureNameMappingPresent(icebergTable);
+
+      TableIdentifier v1TableIdent = v1SourceTable().identifier();
+      String stagingLocation = getMetadataLocation(icebergTable);
+      LOG.info("Generating Iceberg metadata for {} in {}", destTableIdent(), stagingLocation);
+      SparkTableUtil.importSparkTable(spark(), v1TableIdent, icebergTable, stagingLocation);
+
+      LOG.info("Committing staged changes to {}", destTableIdent());
+      stagedTable.commitStagedChanges();
+      threw = false;
+    } finally {
+      if (threw) {
+        LOG.error("Error when populating the staged table with metadata, aborting changes");
+
+        try {
+          stagedTable.abortStagedChanges();
+        } catch (Exception abortException) {
+          LOG.error("Cannot abort staged changes", abortException);
+        }
+      }
+    }
+
+    Snapshot snapshot = icebergTable.currentSnapshot();
+    long importedDataFilesCount = Long.parseLong(snapshot.summary().get(SnapshotSummary.TOTAL_DATA_FILES_PROP));
+    LOG.info("Successfully loaded Iceberg metadata for {} files to {}", importedDataFilesCount, destTableIdent());
+    return new BaseSnapshotTableActionResult(importedDataFilesCount);
+  }
+
+  @Override
+  protected Map<String, String> targetTableProps() {

Review comment:
       Rebased and applied.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on a change in pull request #2437: Spark: Refactor snapshot and migrate actions

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on a change in pull request #2437:
URL: https://github.com/apache/iceberg/pull/2437#discussion_r609137045



##########
File path: core/src/main/java/org/apache/iceberg/actions/BaseSnapshotTableActionResult.java
##########
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.actions;
+
+public class BaseSnapshotTableActionResult implements SnapshotTable.Result {
+
+  private final long importedDataFilesCount;
+
+  public BaseSnapshotTableActionResult(long importedDataFilesCount) {
+    this.importedDataFilesCount = importedDataFilesCount;
+  }
+
+  @Override
+  public long importedDataFilesCount() {
+    return importedDataFilesCount;

Review comment:
       Not for this pr, but now that I think about it we should probably also let the user know how many metadata files were created as well




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #2437: Spark: Refactor snapshot and migrate actions

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #2437:
URL: https://github.com/apache/iceberg/pull/2437#discussion_r611818097



##########
File path: spark3/src/main/java/org/apache/iceberg/spark/actions/BaseTableMigrationSparkAction.java
##########
@@ -99,20 +93,17 @@
     this.sourceTableLocation = CatalogUtils.URIToString(sourceCatalogTable.storage().locationUri().get());
   }
 
-  @Override
-  public CreateAction withProperties(Map<String, String> properties) {
-    this.additionalProperties.putAll(properties);
-    return this;
+  protected void setDestCatalogAndIdent(CatalogPlugin catalog, Identifier ident) {

Review comment:
       This is still the parent class where we need to reference the dest catalog. However, I think your idea makes sense. We can probably remove the dest fields from the parent class as they are initialized differently now. Then we don't have to do that weird if statement in the constructor.
   
   I've pushed an update. Let me know if you prefer the old approach.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #2437: Spark: Refactor snapshot and migrate actions

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #2437:
URL: https://github.com/apache/iceberg/pull/2437#discussion_r609112776



##########
File path: spark3/src/main/java/org/apache/iceberg/spark/actions/BaseMigrateTableSparkAction.java
##########
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.actions;
+
+import java.util.Map;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.SnapshotSummary;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.actions.BaseMigrateTableActionResult;
+import org.apache.iceberg.actions.MigrateTable;
+import org.apache.iceberg.exceptions.AlreadyExistsException;
+import org.apache.iceberg.exceptions.NoSuchTableException;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.spark.JobGroupInfo;
+import org.apache.iceberg.spark.SparkSessionCatalog;
+import org.apache.iceberg.spark.SparkTableUtil;
+import org.apache.iceberg.spark.source.StagedSparkTable;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.catalyst.TableIdentifier;
+import org.apache.spark.sql.connector.catalog.CatalogPlugin;
+import org.apache.spark.sql.connector.catalog.Identifier;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Some;
+import scala.collection.JavaConverters;
+
+/**
+ * Takes a Spark table in the source catalog and attempts to transform it into an Iceberg
+ * table in the same location with the same identifier. Once complete the identifier which
+ * previously referred to a non-Iceberg table will refer to the newly migrated Iceberg
+ * table.
+ */
+public class BaseMigrateTableSparkAction
+    extends BaseTableMigrationSparkAction<MigrateTable, MigrateTable.Result>
+    implements MigrateTable {
+
+  private static final Logger LOG = LoggerFactory.getLogger(BaseMigrateTableSparkAction.class);
+  private static final String BACKUP_SUFFIX = "_BACKUP_";
+
+  private final Identifier backupIdent;
+
+  public BaseMigrateTableSparkAction(SparkSession spark, CatalogPlugin sourceCatalog, Identifier sourceTableIdent) {
+    super(spark, sourceCatalog, sourceTableIdent, sourceCatalog, sourceTableIdent);
+    String backupName = sourceTableIdent().name() + BACKUP_SUFFIX;
+    this.backupIdent = Identifier.of(sourceTableIdent().namespace(), backupName);
+  }
+
+  @Override
+  protected MigrateTable self() {
+    return this;
+  }
+
+  @Override
+  public MigrateTable tableProperties(Map<String, String> properties) {
+    setProperties(properties);
+    return this;
+  }
+
+  @Override
+  public MigrateTable tableProperty(String property, String value) {
+    setProperty(property, value);
+    return this;
+  }
+
+  @Override
+  public MigrateTable.Result execute() {
+    JobGroupInfo info = newJobGroupInfo("MIGRATE-TABLE", "MIGRATE-TABLE");
+    return withJobGroupInfo(info, this::doExecute);
+  }
+
+  private MigrateTable.Result doExecute() {
+    LOG.info("Starting the migration of {} to Iceberg", sourceTableIdent());
+
+    // move the source table to a new name, halting all modifications and allowing us to stage
+    // the creation of a new Iceberg table in its place
+    renameAndBackupSourceTable();
+
+    StagedSparkTable stagedTable = null;
+    Table icebergTable;
+    boolean threw = true;
+    try {
+      LOG.info("Staging a new Iceberg table {}", destTableIdent());
+      stagedTable = stageDestTable();
+      icebergTable = stagedTable.table();
+
+      LOG.info("Ensuring {} has a valid name mapping", destTableIdent());
+      ensureNameMappingPresent(icebergTable);
+
+      Some<String> backupNamespace = Some.apply(backupIdent.namespace()[0]);
+      TableIdentifier v1BackupIdent = new TableIdentifier(backupIdent.name(), backupNamespace);
+      String stagingLocation = getMetadataLocation(icebergTable);
+      LOG.info("Generating Iceberg metadata for {} in {}", destTableIdent(), stagingLocation);
+      SparkTableUtil.importSparkTable(spark(), v1BackupIdent, icebergTable, stagingLocation);
+
+      LOG.info("Committing staged changes to {}", destTableIdent());
+      stagedTable.commitStagedChanges();
+      threw = false;
+    } finally {
+      if (threw) {
+        LOG.error("Failed to perform the migration, aborting table creation and restoring the original table");
+
+        restoreSourceTable();
+
+        if (stagedTable != null) {
+          try {
+            stagedTable.abortStagedChanges();
+          } catch (Exception abortException) {
+            LOG.error("Cannot abort staged changes", abortException);
+          }
+        }
+      }
+    }
+
+    Snapshot snapshot = icebergTable.currentSnapshot();
+    long migratedDataFilesCount = Long.parseLong(snapshot.summary().get(SnapshotSummary.TOTAL_DATA_FILES_PROP));
+    LOG.info("Successfully loaded Iceberg metadata for {} files to {}", migratedDataFilesCount, destTableIdent());
+    return new BaseMigrateTableActionResult(migratedDataFilesCount);
+  }
+
+  @Override
+  protected Map<String, String> targetTableProps() {

Review comment:
       I kept the existing behavior without changes in #2420.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on a change in pull request #2437: Spark: Refactor snapshot and migrate actions

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on a change in pull request #2437:
URL: https://github.com/apache/iceberg/pull/2437#discussion_r609150565



##########
File path: core/src/main/java/org/apache/iceberg/actions/BaseSnapshotTableActionResult.java
##########
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.actions;
+
+public class BaseSnapshotTableActionResult implements SnapshotTable.Result {
+
+  private final long importedDataFilesCount;
+
+  public BaseSnapshotTableActionResult(long importedDataFilesCount) {
+    this.importedDataFilesCount = importedDataFilesCount;
+  }
+
+  @Override
+  public long importedDataFilesCount() {
+    return importedDataFilesCount;

Review comment:
       #2439




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on pull request #2437: Spark: Refactor snapshot and migrate actions

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on pull request #2437:
URL: https://github.com/apache/iceberg/pull/2437#issuecomment-816466612


   I'll try to see what causes the timeout tomorrow.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #2437: Spark: Refactor snapshot and migrate actions

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #2437:
URL: https://github.com/apache/iceberg/pull/2437#discussion_r610882046



##########
File path: spark3/src/main/java/org/apache/iceberg/spark/actions/BaseMigrateTableSparkAction.java
##########
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.actions;
+
+import java.util.Map;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.SnapshotSummary;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.actions.BaseMigrateTableActionResult;
+import org.apache.iceberg.actions.MigrateTable;
+import org.apache.iceberg.exceptions.AlreadyExistsException;
+import org.apache.iceberg.exceptions.NoSuchTableException;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.spark.JobGroupInfo;
+import org.apache.iceberg.spark.SparkSessionCatalog;
+import org.apache.iceberg.spark.SparkTableUtil;
+import org.apache.iceberg.spark.source.StagedSparkTable;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.catalyst.TableIdentifier;
+import org.apache.spark.sql.connector.catalog.CatalogPlugin;
+import org.apache.spark.sql.connector.catalog.Identifier;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Some;
+import scala.collection.JavaConverters;
+
+/**
+ * Takes a Spark table in the source catalog and attempts to transform it into an Iceberg
+ * table in the same location with the same identifier. Once complete the identifier which
+ * previously referred to a non-Iceberg table will refer to the newly migrated Iceberg
+ * table.
+ */
+public class BaseMigrateTableSparkAction
+    extends BaseTableMigrationSparkAction<MigrateTable, MigrateTable.Result>
+    implements MigrateTable {
+
+  private static final Logger LOG = LoggerFactory.getLogger(BaseMigrateTableSparkAction.class);
+  private static final String BACKUP_SUFFIX = "_BACKUP_";
+
+  private final Identifier backupIdent;
+
+  public BaseMigrateTableSparkAction(SparkSession spark, CatalogPlugin sourceCatalog, Identifier sourceTableIdent) {
+    super(spark, sourceCatalog, sourceTableIdent, sourceCatalog, sourceTableIdent);
+    String backupName = sourceTableIdent().name() + BACKUP_SUFFIX;
+    this.backupIdent = Identifier.of(sourceTableIdent().namespace(), backupName);
+  }
+
+  @Override
+  protected MigrateTable self() {
+    return this;
+  }
+
+  @Override
+  public MigrateTable tableProperties(Map<String, String> properties) {
+    setProperties(properties);
+    return this;
+  }
+
+  @Override
+  public MigrateTable tableProperty(String property, String value) {
+    setProperty(property, value);
+    return this;
+  }
+
+  @Override
+  public MigrateTable.Result execute() {
+    JobGroupInfo info = newJobGroupInfo("MIGRATE-TABLE", "MIGRATE-TABLE");
+    return withJobGroupInfo(info, this::doExecute);
+  }
+
+  private MigrateTable.Result doExecute() {
+    LOG.info("Starting the migration of {} to Iceberg", sourceTableIdent());
+
+    // move the source table to a new name, halting all modifications and allowing us to stage
+    // the creation of a new Iceberg table in its place
+    renameAndBackupSourceTable();
+
+    StagedSparkTable stagedTable = null;
+    Table icebergTable;
+    boolean threw = true;
+    try {
+      LOG.info("Staging a new Iceberg table {}", destTableIdent());
+      stagedTable = stageDestTable();
+      icebergTable = stagedTable.table();
+
+      LOG.info("Ensuring {} has a valid name mapping", destTableIdent());
+      ensureNameMappingPresent(icebergTable);
+
+      Some<String> backupNamespace = Some.apply(backupIdent.namespace()[0]);
+      TableIdentifier v1BackupIdent = new TableIdentifier(backupIdent.name(), backupNamespace);
+      String stagingLocation = getMetadataLocation(icebergTable);
+      LOG.info("Generating Iceberg metadata for {} in {}", destTableIdent(), stagingLocation);
+      SparkTableUtil.importSparkTable(spark(), v1BackupIdent, icebergTable, stagingLocation);
+
+      LOG.info("Committing staged changes to {}", destTableIdent());
+      stagedTable.commitStagedChanges();
+      threw = false;
+    } finally {
+      if (threw) {
+        LOG.error("Failed to perform the migration, aborting table creation and restoring the original table");
+
+        restoreSourceTable();
+
+        if (stagedTable != null) {
+          try {
+            stagedTable.abortStagedChanges();
+          } catch (Exception abortException) {
+            LOG.error("Cannot abort staged changes", abortException);
+          }
+        }
+      }
+    }
+
+    Snapshot snapshot = icebergTable.currentSnapshot();
+    long migratedDataFilesCount = Long.parseLong(snapshot.summary().get(SnapshotSummary.TOTAL_DATA_FILES_PROP));
+    LOG.info("Successfully loaded Iceberg metadata for {} files to {}", migratedDataFilesCount, destTableIdent());
+    return new BaseMigrateTableActionResult(migratedDataFilesCount);
+  }
+
+  @Override
+  protected Map<String, String> targetTableProps() {

Review comment:
       Rebased and updated.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on a change in pull request #2437: Spark: Refactor snapshot and migrate actions

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on a change in pull request #2437:
URL: https://github.com/apache/iceberg/pull/2437#discussion_r610896522



##########
File path: spark3/src/main/java/org/apache/iceberg/spark/actions/BaseTableMigrationSparkAction.java
##########
@@ -99,20 +93,17 @@
     this.sourceTableLocation = CatalogUtils.URIToString(sourceCatalogTable.storage().locationUri().get());
   }
 
-  @Override
-  public CreateAction withProperties(Map<String, String> properties) {
-    this.additionalProperties.putAll(properties);
-    return this;
+  protected void setDestCatalogAndIdent(CatalogPlugin catalog, Identifier ident) {

Review comment:
       We let you use "as" with Migration? Maybe I'm forgetting how this worked.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #2437: Spark: Refactor snapshot and migrate actions

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #2437:
URL: https://github.com/apache/iceberg/pull/2437#discussion_r609112915



##########
File path: spark3/src/main/java/org/apache/iceberg/spark/actions/BaseSnapshotTableSparkAction.java
##########
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.actions;
+
+import java.util.Map;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.SnapshotSummary;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.actions.BaseSnapshotTableActionResult;
+import org.apache.iceberg.actions.SnapshotTable;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.spark.JobGroupInfo;
+import org.apache.iceberg.spark.Spark3Util;
+import org.apache.iceberg.spark.Spark3Util.CatalogAndIdentifier;
+import org.apache.iceberg.spark.SparkTableUtil;
+import org.apache.iceberg.spark.source.StagedSparkTable;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.catalyst.TableIdentifier;
+import org.apache.spark.sql.connector.catalog.CatalogPlugin;
+import org.apache.spark.sql.connector.catalog.Identifier;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.collection.JavaConverters;
+
+/**
+ * Creates a new Iceberg table based on a source Spark table. The new Iceberg table will
+ * have a different data and metadata directory allowing it to exist independently of the
+ * source table.
+ */
+public class BaseSnapshotTableSparkAction
+    extends BaseTableMigrationSparkAction<SnapshotTable, SnapshotTable.Result>
+    implements SnapshotTable {
+
+  private static final Logger LOG = LoggerFactory.getLogger(BaseSnapshotTableSparkAction.class);
+
+  private String destTableLocation = null;
+
+  public BaseSnapshotTableSparkAction(SparkSession spark, CatalogPlugin sourceCatalog, Identifier sourceTableIdent,
+                                      CatalogPlugin destCatalog, Identifier destTableIdent) {
+    super(spark, sourceCatalog, sourceTableIdent, destCatalog, destTableIdent);
+  }
+
+  @Override
+  protected SnapshotTable self() {
+    return this;
+  }
+
+  @Override
+  public SnapshotTable as(String ident) {
+    String ctx = "snapshot destination";
+    CatalogPlugin defaultCatalog = spark().sessionState().catalogManager().currentCatalog();
+    CatalogAndIdentifier catalogAndIdent = Spark3Util.catalogAndIdentifier(ctx, spark(), ident, defaultCatalog);
+    setDestCatalogAndIdent(catalogAndIdent.catalog(), catalogAndIdent.identifier());
+    return this;
+  }
+
+  @Override
+  public SnapshotTable tableProperties(Map<String, String> properties) {
+    setProperties(properties);
+    return this;
+  }
+
+  @Override
+  public SnapshotTable tableProperty(String property, String value) {
+    setProperty(property, value);
+    return this;
+  }
+
+  @Override
+  public SnapshotTable.Result execute() {
+    JobGroupInfo info = newJobGroupInfo("SNAPSHOT-TABLE", "SNAPSHOT-TABLE");
+    return withJobGroupInfo(info, this::doExecute);
+  }
+
+  private SnapshotTable.Result doExecute() {
+    Preconditions.checkArgument(destCatalog() != null && destTableIdent() != null,
+        "The destination catalog and identifier cannot be null. " +
+        "Make sure to configure the action with a valid destination table identifier via the `as` method.");
+
+    LOG.info("Staging a new Iceberg table {} as a snapshot of {}", destTableIdent(), sourceTableIdent());
+    StagedSparkTable stagedTable = stageDestTable();
+    Table icebergTable = stagedTable.table();
+
+    // TODO: Check the dest table location does not overlap with the source table location
+
+    boolean threw = true;
+    try {
+      LOG.info("Ensuring {} has a valid name mapping", destTableIdent());
+      ensureNameMappingPresent(icebergTable);
+
+      TableIdentifier v1TableIdent = v1SourceTable().identifier();
+      String stagingLocation = getMetadataLocation(icebergTable);
+      LOG.info("Generating Iceberg metadata for {} in {}", destTableIdent(), stagingLocation);
+      SparkTableUtil.importSparkTable(spark(), v1TableIdent, icebergTable, stagingLocation);
+
+      LOG.info("Committing staged changes to {}", destTableIdent());
+      stagedTable.commitStagedChanges();
+      threw = false;
+    } finally {
+      if (threw) {
+        LOG.error("Error when populating the staged table with metadata, aborting changes");
+
+        try {
+          stagedTable.abortStagedChanges();
+        } catch (Exception abortException) {
+          LOG.error("Cannot abort staged changes", abortException);
+        }
+      }
+    }
+
+    Snapshot snapshot = icebergTable.currentSnapshot();
+    long importedDataFilesCount = Long.parseLong(snapshot.summary().get(SnapshotSummary.TOTAL_DATA_FILES_PROP));
+    LOG.info("Successfully loaded Iceberg metadata for {} files to {}", importedDataFilesCount, destTableIdent());
+    return new BaseSnapshotTableActionResult(importedDataFilesCount);
+  }
+
+  @Override
+  protected Map<String, String> targetTableProps() {

Review comment:
       Here as well. No changes from #2420.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #2437: Spark: Refactor snapshot and migrate actions

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #2437:
URL: https://github.com/apache/iceberg/pull/2437#discussion_r609113558



##########
File path: spark3/src/main/java/org/apache/iceberg/spark/actions/BaseTableMigrationSparkAction.java
##########
@@ -99,20 +93,17 @@
     this.sourceTableLocation = CatalogUtils.URIToString(sourceCatalogTable.storage().locationUri().get());
   }
 
-  @Override
-  public CreateAction withProperties(Map<String, String> properties) {
-    this.additionalProperties.putAll(properties);
-    return this;
+  protected void setDestCatalogAndIdent(CatalogPlugin catalog, Identifier ident) {

Review comment:
       Called from `as`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on pull request #2437: Spark: Refactor snapshot and migrate actions

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on pull request #2437:
URL: https://github.com/apache/iceberg/pull/2437#issuecomment-816026667


   @pvary, here is one run
   
   https://github.com/apache/iceberg/pull/2362/checks?check_run_id=2290454777


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #2437: Spark: Refactor snapshot and migrate actions

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #2437:
URL: https://github.com/apache/iceberg/pull/2437#discussion_r609113436



##########
File path: spark3/src/main/java/org/apache/iceberg/spark/actions/BaseTableMigrationSparkAction.java
##########
@@ -70,19 +64,19 @@
   private final Identifier sourceTableIdent;
 
   // Destination Fields
-  private final StagingTableCatalog destCatalog;
-  private final Identifier destTableIdent;
+  private StagingTableCatalog destCatalog;

Review comment:
       We initialize the dest table later via `as` method.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] pvary commented on pull request #2437: Spark: Refactor snapshot and migrate actions

Posted by GitBox <gi...@apache.org>.
pvary commented on pull request #2437:
URL: https://github.com/apache/iceberg/pull/2437#issuecomment-816441761


   > @pvary, here is one run
   > 
   > https://github.com/apache/iceberg/pull/2362/checks?check_run_id=2290454777
   
   I see this at the end of the logs:
   ```
   2021-04-08T01:04:25.1453911Z ##[error]The operation was canceled.
   ```
   
   Was it done manually, or it was cancelled automatically because of the failures?
   
   I am asking because some time ago (in #1789) I have introduced a feature to collect the test logs in exactly the same situations that happen here (flaky test failures on CI). This created a `test logs` artifact for the failed runs. This log is not available for this run.
   
   If the run was cancelled automatically then I have to check what changed around the build process. OTOH if the run was cancelled manually then I need to find a non-cancelled run with the same failures.
   
   Thanks,
   Peter


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on pull request #2437: Spark: Refactor snapshot and migrate actions

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on pull request #2437:
URL: https://github.com/apache/iceberg/pull/2437#issuecomment-818226150


   Agree, @RussellSpitzer. Let me fix it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on pull request #2437: Spark: Refactor snapshot and migrate actions

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on pull request #2437:
URL: https://github.com/apache/iceberg/pull/2437#issuecomment-816951367


   Rebased this one. @RussellSpitzer, could you take a look?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on a change in pull request #2437: Spark: Refactor snapshot and migrate actions

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on a change in pull request #2437:
URL: https://github.com/apache/iceberg/pull/2437#discussion_r611853308



##########
File path: spark3/src/main/java/org/apache/iceberg/spark/actions/BaseTableMigrationSparkAction.java
##########
@@ -53,37 +49,28 @@
 import org.apache.spark.sql.connector.expressions.Transform;
 import org.apache.spark.sql.types.StructType;
 
-abstract class Spark3CreateAction implements CreateAction {
+abstract class BaseTableMigrationSparkAction<ThisT, R> extends BaseSparkAction<ThisT, R> {
   private static final Set<String> ALLOWED_SOURCES = ImmutableSet.of("parquet", "avro", "orc", "hive");
   protected static final String LOCATION = "location";
   protected static final String ICEBERG_METADATA_FOLDER = "metadata";
   protected static final List<String> EXCLUDED_PROPERTIES =
       ImmutableList.of("path", "transient_lastDdlTime", "serialization.format");
 
-  private final SparkSession spark;
-
   // Source Fields
   private final V1Table sourceTable;
   private final CatalogTable sourceCatalogTable;
   private final String sourceTableLocation;
   private final TableCatalog sourceCatalog;
   private final Identifier sourceTableIdent;
 
-  // Destination Fields
-  private final StagingTableCatalog destCatalog;
-  private final Identifier destTableIdent;
-
   // Optional Parameters for destination
   private final Map<String, String> additionalProperties = Maps.newHashMap();
 
-  Spark3CreateAction(SparkSession spark, CatalogPlugin sourceCatalog, Identifier sourceTableIdent,
-                     CatalogPlugin destCatalog, Identifier destTableIdent) {
+  BaseTableMigrationSparkAction(SparkSession spark, CatalogPlugin sourceCatalog, Identifier sourceTableIdent) {

Review comment:
       I liked having this be a non "migrate - snapshot" name here because I didn't want any confusion if there was an error during "snapshots" that made it look like it was doing a migrate because of the trace.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] pvary commented on pull request #2437: Spark: Refactor snapshot and migrate actions

Posted by GitBox <gi...@apache.org>.
pvary commented on pull request #2437:
URL: https://github.com/apache/iceberg/pull/2437#issuecomment-816020471


   > I see more and more Tez-related failures on recent PRs.
   
   Could you please provide a link for any of the failed test runs? I would like to get the logs for the failed tests.
   
   Thanks,
   Peter


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #2437: Spark: Refactor snapshot and migrate actions

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #2437:
URL: https://github.com/apache/iceberg/pull/2437#discussion_r609113272



##########
File path: spark3/src/main/java/org/apache/iceberg/spark/actions/BaseSnapshotTableSparkAction.java
##########
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.actions;
+
+import java.util.Map;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.SnapshotSummary;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.actions.BaseSnapshotTableActionResult;
+import org.apache.iceberg.actions.SnapshotTable;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.spark.JobGroupInfo;
+import org.apache.iceberg.spark.Spark3Util;
+import org.apache.iceberg.spark.Spark3Util.CatalogAndIdentifier;
+import org.apache.iceberg.spark.SparkTableUtil;
+import org.apache.iceberg.spark.source.StagedSparkTable;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.catalyst.TableIdentifier;
+import org.apache.spark.sql.connector.catalog.CatalogPlugin;
+import org.apache.spark.sql.connector.catalog.Identifier;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.collection.JavaConverters;
+
+/**
+ * Creates a new Iceberg table based on a source Spark table. The new Iceberg table will
+ * have a different data and metadata directory allowing it to exist independently of the
+ * source table.
+ */
+public class BaseSnapshotTableSparkAction
+    extends BaseTableMigrationSparkAction<SnapshotTable, SnapshotTable.Result>
+    implements SnapshotTable {
+
+  private static final Logger LOG = LoggerFactory.getLogger(BaseSnapshotTableSparkAction.class);
+
+  private String destTableLocation = null;
+
+  public BaseSnapshotTableSparkAction(SparkSession spark, CatalogPlugin sourceCatalog, Identifier sourceTableIdent,
+                                      CatalogPlugin destCatalog, Identifier destTableIdent) {
+    super(spark, sourceCatalog, sourceTableIdent, destCatalog, destTableIdent);
+  }
+
+  @Override
+  protected SnapshotTable self() {
+    return this;
+  }
+
+  @Override
+  public SnapshotTable as(String ident) {
+    String ctx = "snapshot destination";
+    CatalogPlugin defaultCatalog = spark().sessionState().catalogManager().currentCatalog();
+    CatalogAndIdentifier catalogAndIdent = Spark3Util.catalogAndIdentifier(ctx, spark(), ident, defaultCatalog);
+    setDestCatalogAndIdent(catalogAndIdent.catalog(), catalogAndIdent.identifier());
+    return this;
+  }
+
+  @Override
+  public SnapshotTable tableProperties(Map<String, String> properties) {
+    setProperties(properties);
+    return this;
+  }
+
+  @Override
+  public SnapshotTable tableProperty(String property, String value) {
+    setProperty(property, value);
+    return this;
+  }
+
+  @Override
+  public SnapshotTable.Result execute() {
+    JobGroupInfo info = newJobGroupInfo("SNAPSHOT-TABLE", "SNAPSHOT-TABLE");
+    return withJobGroupInfo(info, this::doExecute);
+  }
+
+  private SnapshotTable.Result doExecute() {
+    Preconditions.checkArgument(destCatalog() != null && destTableIdent() != null,
+        "The destination catalog and identifier cannot be null. " +
+        "Make sure to configure the action with a valid destination table identifier via the `as` method.");
+
+    LOG.info("Staging a new Iceberg table {} as a snapshot of {}", destTableIdent(), sourceTableIdent());
+    StagedSparkTable stagedTable = stageDestTable();
+    Table icebergTable = stagedTable.table();
+
+    // TODO: Check the dest table location does not overlap with the source table location
+
+    boolean threw = true;
+    try {
+      LOG.info("Ensuring {} has a valid name mapping", destTableIdent());
+      ensureNameMappingPresent(icebergTable);
+
+      TableIdentifier v1TableIdent = v1SourceTable().identifier();
+      String stagingLocation = getMetadataLocation(icebergTable);
+      LOG.info("Generating Iceberg metadata for {} in {}", destTableIdent(), stagingLocation);
+      SparkTableUtil.importSparkTable(spark(), v1TableIdent, icebergTable, stagingLocation);
+
+      LOG.info("Committing staged changes to {}", destTableIdent());
+      stagedTable.commitStagedChanges();
+      threw = false;
+    } finally {
+      if (threw) {
+        LOG.error("Error when populating the staged table with metadata, aborting changes");
+
+        try {
+          stagedTable.abortStagedChanges();
+        } catch (Exception abortException) {
+          LOG.error("Cannot abort staged changes", abortException);
+        }
+      }
+    }
+
+    Snapshot snapshot = icebergTable.currentSnapshot();
+    long importedDataFilesCount = Long.parseLong(snapshot.summary().get(SnapshotSummary.TOTAL_DATA_FILES_PROP));
+    LOG.info("Successfully loaded Iceberg metadata for {} files to {}", importedDataFilesCount, destTableIdent());
+    return new BaseSnapshotTableActionResult(importedDataFilesCount);
+  }
+
+  @Override
+  protected Map<String, String> targetTableProps() {
+    Map<String, String> properties = Maps.newHashMap();
+
+    // Remove any possible location properties from origin properties
+    properties.putAll(JavaConverters.mapAsJavaMapConverter(v1SourceTable().properties()).asJava());
+    properties.remove(LOCATION);
+    properties.remove(TableProperties.WRITE_METADATA_LOCATION);
+    properties.remove(TableProperties.WRITE_NEW_DATA_LOCATION);
+
+    EXCLUDED_PROPERTIES.forEach(properties::remove);
+    properties.put(TableCatalog.PROP_PROVIDER, "iceberg");
+    properties.put(TableProperties.GC_ENABLED, "false");
+    properties.put("snapshot", "true");
+    properties.putAll(additionalProperties());
+
+    // Don't use the default location for the destination table if an alternate has be set
+    if (destTableLocation != null) {
+      properties.put(LOCATION, destTableLocation);
+    }
+
+    return properties;
+  }
+
+  @Override
+  protected TableCatalog checkSourceCatalog(CatalogPlugin catalog) {
+    // currently the import code relies on being able to look up the table in the session catalog
+    Preconditions.checkArgument(catalog.name().equalsIgnoreCase("spark_catalog"),
+        "Cannot snapshot a table that isn't in the session catalog (i.e. spark_catalog). " +
+        "Found source catalog: %s.", catalog.name());
+
+    Preconditions.checkArgument(catalog instanceof TableCatalog,
+        "Cannot snapshot as catalog %s of class %s in not a table catalog",
+        catalog.name(), catalog.getClass().getName());
+
+    return (TableCatalog) catalog;
+  }
+
+  @Override
+  public SnapshotTable tableLocation(String location) {

Review comment:
       This check will have to be refined separately.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #2437: Spark: Refactor snapshot and migrate actions

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #2437:
URL: https://github.com/apache/iceberg/pull/2437#discussion_r609141336



##########
File path: core/src/main/java/org/apache/iceberg/actions/BaseSnapshotTableActionResult.java
##########
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.actions;
+
+public class BaseSnapshotTableActionResult implements SnapshotTable.Result {
+
+  private final long importedDataFilesCount;
+
+  public BaseSnapshotTableActionResult(long importedDataFilesCount) {
+    this.importedDataFilesCount = importedDataFilesCount;
+  }
+
+  @Override
+  public long importedDataFilesCount() {
+    return importedDataFilesCount;

Review comment:
       Good idea! Now that we have the result interface, we can evolve it. Could you create an issue for this, @RussellSpitzer? 
   
   @flyrain, would you be interested to pick it up?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org