You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by ao...@apache.org on 2021/04/07 18:57:38 UTC

[iceberg] branch master updated: Spark: Refine logging in snapshot and migrate actions (#2431)

This is an automated email from the ASF dual-hosted git repository.

aokolnychyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 27eaa79  Spark: Refine logging in snapshot and migrate actions (#2431)
27eaa79 is described below

commit 27eaa796530f56eb99d111e6110be3e1a6eb99e7
Author: Anton Okolnychyi <ao...@apple.com>
AuthorDate: Wed Apr 7 11:57:27 2021 -0700

    Spark: Refine logging in snapshot and migrate actions (#2431)
---
 .../java/org/apache/iceberg/actions/Spark3MigrateAction.java   |  8 ++++++--
 .../java/org/apache/iceberg/actions/Spark3SnapshotAction.java  | 10 ++++++----
 2 files changed, 12 insertions(+), 6 deletions(-)

diff --git a/spark3/src/main/java/org/apache/iceberg/actions/Spark3MigrateAction.java b/spark3/src/main/java/org/apache/iceberg/actions/Spark3MigrateAction.java
index 6a7ff58..acfe18c 100644
--- a/spark3/src/main/java/org/apache/iceberg/actions/Spark3MigrateAction.java
+++ b/spark3/src/main/java/org/apache/iceberg/actions/Spark3MigrateAction.java
@@ -60,6 +60,8 @@ public class Spark3MigrateAction extends Spark3CreateAction {
   }
 
   private Long doExecute() {
+    LOG.info("Starting the migration of {} to Iceberg", sourceTableIdent());
+
     // Move source table to a new name, halting all modifications and allowing us to stage
     // the creation of a new Iceberg table in its place
     renameAndBackupSourceTable();
@@ -68,18 +70,20 @@ public class Spark3MigrateAction extends Spark3CreateAction {
     Table icebergTable;
     boolean threw = true;
     try {
+      LOG.info("Staging a new Iceberg table {}", destTableIdent());
       stagedTable = stageDestTable();
       icebergTable = stagedTable.table();
 
+      LOG.info("Ensuring {} has a valid name mapping", destTableIdent());
       ensureNameMappingPresent(icebergTable);
 
       String stagingLocation = getMetadataLocation(icebergTable);
-
-      LOG.info("Beginning migration of {} using metadata location {}", sourceTableIdent(), stagingLocation);
       Some<String> backupNamespace = Some.apply(backupIdent.namespace()[0]);
       TableIdentifier v1BackupIdentifier = new TableIdentifier(backupIdent.name(), backupNamespace);
+      LOG.info("Generating Iceberg metadata for {} in {}", destTableIdent(), stagingLocation);
       SparkTableUtil.importSparkTable(spark(), v1BackupIdentifier, icebergTable, stagingLocation);
 
+      LOG.info("Committing staged changes to {}", destTableIdent());
       stagedTable.commitStagedChanges();
       threw = false;
     } finally {
diff --git a/spark3/src/main/java/org/apache/iceberg/actions/Spark3SnapshotAction.java b/spark3/src/main/java/org/apache/iceberg/actions/Spark3SnapshotAction.java
index 7062cdc..8a9f293 100644
--- a/spark3/src/main/java/org/apache/iceberg/actions/Spark3SnapshotAction.java
+++ b/spark3/src/main/java/org/apache/iceberg/actions/Spark3SnapshotAction.java
@@ -55,20 +55,22 @@ public class Spark3SnapshotAction extends Spark3CreateAction implements Snapshot
   }
 
   private Long doExecute() {
+    LOG.info("Staging a new Iceberg table {} as a snapshot of {}", destTableIdent(), sourceTableIdent());
     StagedSparkTable stagedTable = stageDestTable();
     Table icebergTable = stagedTable.table();
     // TODO Check table location here against source location
 
     boolean threw = true;
     try {
+      LOG.info("Ensuring {} has a valid name mapping", destTableIdent());
       ensureNameMappingPresent(icebergTable);
 
       String stagingLocation = getMetadataLocation(icebergTable);
-      LOG.info("Beginning snapshot of {} to {} using metadata location {}", sourceTableIdent(), destTableIdent(),
-          stagingLocation);
-
       TableIdentifier v1TableIdentifier = v1SourceTable().identifier();
+      LOG.info("Generating Iceberg metadata for {} in {}", destTableIdent(), stagingLocation);
       SparkTableUtil.importSparkTable(spark(), v1TableIdentifier, icebergTable, stagingLocation);
+
+      LOG.info("Committing staged changes to {}", destTableIdent());
       stagedTable.commitStagedChanges();
       threw = false;
     } finally {
@@ -85,7 +87,7 @@ public class Spark3SnapshotAction extends Spark3CreateAction implements Snapshot
 
     Snapshot snapshot = icebergTable.currentSnapshot();
     long numMigratedFiles = Long.parseLong(snapshot.summary().get(SnapshotSummary.TOTAL_DATA_FILES_PROP));
-    LOG.info("Successfully loaded Iceberg metadata for {} files", numMigratedFiles);
+    LOG.info("Successfully loaded Iceberg metadata for {} files to {}", numMigratedFiles, destTableIdent());
     return numMigratedFiles;
   }