You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2019/08/01 20:06:00 UTC

[incubator-iceberg] branch master updated: Spark: Add support for write-audit-publish workflows (#342)

This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new a7bed2b  Spark: Add support for write-audit-publish workflows (#342)
a7bed2b is described below

commit a7bed2bfde9ca0a179790bddf7e145c9eb6dce71
Author: Ryan Blue <rd...@users.noreply.github.com>
AuthorDate: Thu Aug 1 13:05:56 2019 -0700

    Spark: Add support for write-audit-publish workflows (#342)
---
 .../main/java/org/apache/iceberg/SnapshotUpdate.java |  7 +++++++
 .../java/org/apache/iceberg/SnapshotProducer.java    | 15 ++++++++++++++-
 .../main/java/org/apache/iceberg/TableMetadata.java  | 10 ++++++++++
 .../java/org/apache/iceberg/TableProperties.java     |  3 +++
 .../apache/iceberg/spark/source/IcebergSource.java   |  3 ++-
 .../java/org/apache/iceberg/spark/source/Writer.java | 20 ++++++++++++++++++++
 6 files changed, 56 insertions(+), 2 deletions(-)

diff --git a/api/src/main/java/org/apache/iceberg/SnapshotUpdate.java b/api/src/main/java/org/apache/iceberg/SnapshotUpdate.java
index 5cabc02..fdd1a63 100644
--- a/api/src/main/java/org/apache/iceberg/SnapshotUpdate.java
+++ b/api/src/main/java/org/apache/iceberg/SnapshotUpdate.java
@@ -45,4 +45,11 @@ public interface SnapshotUpdate<ThisT> extends PendingUpdate<Snapshot> {
    */
   ThisT deleteWith(Consumer<String> deleteFunc);
 
+  /**
+   * Called to stage a snapshot in table metadata, but not update the current snapshot id.
+   *
+   * @return this for method chaining
+   */
+  ThisT stageOnly();
+
 }
diff --git a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java
index e8cc29b..83f15b5 100644
--- a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java
+++ b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java
@@ -79,6 +79,7 @@ abstract class SnapshotProducer<ThisT> implements SnapshotUpdate<ThisT> {
   private final List<String> manifestLists = Lists.newArrayList();
   private Long snapshotId = null;
   private TableMetadata base = null;
+  private boolean stageOnly = false;
   private Consumer<String> deleteFunc = defaultDelete;
 
   protected SnapshotProducer(TableOperations ops) {
@@ -97,6 +98,12 @@ abstract class SnapshotProducer<ThisT> implements SnapshotUpdate<ThisT> {
   protected abstract ThisT self();
 
   @Override
+  public ThisT stageOnly() {
+    this.stageOnly = true;
+    return self();
+  }
+
+  @Override
   public ThisT deleteWith(Consumer<String> deleteCallback) {
     Preconditions.checkArgument(this.deleteFunc == defaultDelete, "Cannot set delete callback more than once");
     this.deleteFunc = deleteCallback;
@@ -230,7 +237,13 @@ abstract class SnapshotProducer<ThisT> implements SnapshotUpdate<ThisT> {
           .run(taskOps -> {
             Snapshot newSnapshot = apply();
             newSnapshotId.set(newSnapshot.snapshotId());
-            TableMetadata updated = base.replaceCurrentSnapshot(newSnapshot);
+            TableMetadata updated;
+            if (stageOnly) {
+              updated = base.addStagedSnapshot(newSnapshot);
+            } else {
+              updated = base.replaceCurrentSnapshot(newSnapshot);
+            }
+
             // if the table UUID is missing, add it here. the UUID will be re-created each time this operation retries
             // to ensure that if a concurrent operation assigns the UUID, this operation will not fail.
             taskOps.commit(base, updated.withUUID());
diff --git a/core/src/main/java/org/apache/iceberg/TableMetadata.java b/core/src/main/java/org/apache/iceberg/TableMetadata.java
index ef289b4..ea015ba 100644
--- a/core/src/main/java/org/apache/iceberg/TableMetadata.java
+++ b/core/src/main/java/org/apache/iceberg/TableMetadata.java
@@ -321,6 +321,16 @@ public class TableMetadata {
         currentSnapshotId, snapshots, snapshotLog);
   }
 
+  public TableMetadata addStagedSnapshot(Snapshot snapshot) {
+    List<Snapshot> newSnapshots = ImmutableList.<Snapshot>builder()
+        .addAll(snapshots)
+        .add(snapshot)
+        .build();
+    return new TableMetadata(ops, null, uuid, location,
+        snapshot.timestampMillis(), lastColumnId, schema, defaultSpecId, specs, properties,
+        currentSnapshotId, newSnapshots, snapshotLog);
+  }
+
   public TableMetadata replaceCurrentSnapshot(Snapshot snapshot) {
     List<Snapshot> newSnapshots = ImmutableList.<Snapshot>builder()
         .addAll(snapshots)
diff --git a/core/src/main/java/org/apache/iceberg/TableProperties.java b/core/src/main/java/org/apache/iceberg/TableProperties.java
index 4dd778d..2ca8a25 100644
--- a/core/src/main/java/org/apache/iceberg/TableProperties.java
+++ b/core/src/main/java/org/apache/iceberg/TableProperties.java
@@ -93,4 +93,7 @@ public class TableProperties {
   public static final String DEFAULT_WRITE_METRICS_MODE_DEFAULT = "truncate(16)";
 
   public static final String DEFAULT_NAME_MAPPING = "schema.name-mapping.default";
+
+  public static final String WRITE_AUDIT_PUBLISH_ENABLED = "write.wap.enabled";
+  public static final String WRITE_AUDIT_PUBLISH_ENABLED_DEFAULT = "false";
 }
diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java b/spark/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java
index b6dfade..3656787 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java
@@ -75,7 +75,8 @@ public class IcebergSource implements DataSourceV2, ReadSupport, WriteSupport, D
     Table table = getTableAndResolveHadoopConfiguration(options, conf);
     validateWriteSchema(table.schema(), dsStruct);
     String appId = lazySparkSession().sparkContext().applicationId();
-    return Optional.of(new Writer(table, options, mode == SaveMode.Overwrite, appId));
+    String wapId = lazySparkSession().conf().get("spark.wap.id", null);
+    return Optional.of(new Writer(table, options, mode == SaveMode.Overwrite, appId, wapId));
   }
 
   @Override
diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/Writer.java b/spark/src/main/java/org/apache/iceberg/spark/source/Writer.java
index aa9becc..f520505 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/source/Writer.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/source/Writer.java
@@ -45,6 +45,7 @@ import org.apache.iceberg.ReplacePartitions;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.SnapshotUpdate;
 import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
 import org.apache.iceberg.avro.Avro;
 import org.apache.iceberg.encryption.EncryptedOutputFile;
 import org.apache.iceberg.encryption.EncryptionManager;
@@ -87,14 +88,20 @@ class Writer implements DataSourceWriter {
   private final EncryptionManager encryptionManager;
   private final boolean replacePartitions;
   private final String applicationId;
+  private final String wapId;
 
   Writer(Table table, DataSourceOptions options, boolean replacePartitions, String applicationId) {
+    this(table, options, replacePartitions, applicationId, null);
+  }
+
+  Writer(Table table, DataSourceOptions options, boolean replacePartitions, String applicationId, String wapId) {
     this.table = table;
     this.format = getFileFormat(table.properties(), options);
     this.fileIo = table.io();
     this.encryptionManager = table.encryption();
     this.replacePartitions = replacePartitions;
     this.applicationId = applicationId;
+    this.wapId = wapId;
   }
 
   private FileFormat getFileFormat(Map<String, String> tableProperties, DataSourceOptions options) {
@@ -104,6 +111,11 @@ class Writer implements DataSourceWriter {
     return FileFormat.valueOf(formatString.toUpperCase(Locale.ENGLISH));
   }
 
+  private boolean isWapTable() {
+    return Boolean.parseBoolean(table.properties().getOrDefault(
+        TableProperties.WRITE_AUDIT_PUBLISH_ENABLED, TableProperties.WRITE_AUDIT_PUBLISH_ENABLED_DEFAULT));
+  }
+
   @Override
   public DataWriterFactory<InternalRow> createWriterFactory() {
     return new WriterFactory(
@@ -124,6 +136,14 @@ class Writer implements DataSourceWriter {
     if (applicationId != null) {
       operation.set("spark.app.id", applicationId);
     }
+
+    if (isWapTable() && wapId != null) {
+      // write-audit-publish is enabled for this table and job
+      // stage the changes without changing the current snapshot
+      operation.set("wap.id", wapId);
+      operation.stageOnly();
+    }
+
     long start = System.currentTimeMillis();
     operation.commit(); // abort is automatically called if this fails
     long duration = System.currentTimeMillis() - start;