You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by ao...@apache.org on 2021/04/07 18:57:38 UTC
[iceberg] branch master updated: Spark: Refine logging in snapshot
and migrate actions (#2431)
This is an automated email from the ASF dual-hosted git repository.
aokolnychyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 27eaa79 Spark: Refine logging in snapshot and migrate actions (#2431)
27eaa79 is described below
commit 27eaa796530f56eb99d111e6110be3e1a6eb99e7
Author: Anton Okolnychyi <ao...@apple.com>
AuthorDate: Wed Apr 7 11:57:27 2021 -0700
Spark: Refine logging in snapshot and migrate actions (#2431)
---
.../java/org/apache/iceberg/actions/Spark3MigrateAction.java | 8 ++++++--
.../java/org/apache/iceberg/actions/Spark3SnapshotAction.java | 10 ++++++----
2 files changed, 12 insertions(+), 6 deletions(-)
diff --git a/spark3/src/main/java/org/apache/iceberg/actions/Spark3MigrateAction.java b/spark3/src/main/java/org/apache/iceberg/actions/Spark3MigrateAction.java
index 6a7ff58..acfe18c 100644
--- a/spark3/src/main/java/org/apache/iceberg/actions/Spark3MigrateAction.java
+++ b/spark3/src/main/java/org/apache/iceberg/actions/Spark3MigrateAction.java
@@ -60,6 +60,8 @@ public class Spark3MigrateAction extends Spark3CreateAction {
}
private Long doExecute() {
+ LOG.info("Starting the migration of {} to Iceberg", sourceTableIdent());
+
// Move source table to a new name, halting all modifications and allowing us to stage
// the creation of a new Iceberg table in its place
renameAndBackupSourceTable();
@@ -68,18 +70,20 @@ public class Spark3MigrateAction extends Spark3CreateAction {
Table icebergTable;
boolean threw = true;
try {
+ LOG.info("Staging a new Iceberg table {}", destTableIdent());
stagedTable = stageDestTable();
icebergTable = stagedTable.table();
+ LOG.info("Ensuring {} has a valid name mapping", destTableIdent());
ensureNameMappingPresent(icebergTable);
String stagingLocation = getMetadataLocation(icebergTable);
-
- LOG.info("Beginning migration of {} using metadata location {}", sourceTableIdent(), stagingLocation);
Some<String> backupNamespace = Some.apply(backupIdent.namespace()[0]);
TableIdentifier v1BackupIdentifier = new TableIdentifier(backupIdent.name(), backupNamespace);
+ LOG.info("Generating Iceberg metadata for {} in {}", destTableIdent(), stagingLocation);
SparkTableUtil.importSparkTable(spark(), v1BackupIdentifier, icebergTable, stagingLocation);
+ LOG.info("Committing staged changes to {}", destTableIdent());
stagedTable.commitStagedChanges();
threw = false;
} finally {
diff --git a/spark3/src/main/java/org/apache/iceberg/actions/Spark3SnapshotAction.java b/spark3/src/main/java/org/apache/iceberg/actions/Spark3SnapshotAction.java
index 7062cdc..8a9f293 100644
--- a/spark3/src/main/java/org/apache/iceberg/actions/Spark3SnapshotAction.java
+++ b/spark3/src/main/java/org/apache/iceberg/actions/Spark3SnapshotAction.java
@@ -55,20 +55,22 @@ public class Spark3SnapshotAction extends Spark3CreateAction implements Snapshot
}
private Long doExecute() {
+ LOG.info("Staging a new Iceberg table {} as a snapshot of {}", destTableIdent(), sourceTableIdent());
StagedSparkTable stagedTable = stageDestTable();
Table icebergTable = stagedTable.table();
// TODO Check table location here against source location
boolean threw = true;
try {
+ LOG.info("Ensuring {} has a valid name mapping", destTableIdent());
ensureNameMappingPresent(icebergTable);
String stagingLocation = getMetadataLocation(icebergTable);
- LOG.info("Beginning snapshot of {} to {} using metadata location {}", sourceTableIdent(), destTableIdent(),
- stagingLocation);
-
TableIdentifier v1TableIdentifier = v1SourceTable().identifier();
+ LOG.info("Generating Iceberg metadata for {} in {}", destTableIdent(), stagingLocation);
SparkTableUtil.importSparkTable(spark(), v1TableIdentifier, icebergTable, stagingLocation);
+
+ LOG.info("Committing staged changes to {}", destTableIdent());
stagedTable.commitStagedChanges();
threw = false;
} finally {
@@ -85,7 +87,7 @@ public class Spark3SnapshotAction extends Spark3CreateAction implements Snapshot
Snapshot snapshot = icebergTable.currentSnapshot();
long numMigratedFiles = Long.parseLong(snapshot.summary().get(SnapshotSummary.TOTAL_DATA_FILES_PROP));
- LOG.info("Successfully loaded Iceberg metadata for {} files", numMigratedFiles);
+ LOG.info("Successfully loaded Iceberg metadata for {} files to {}", numMigratedFiles, destTableIdent());
return numMigratedFiles;
}