You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2019/08/01 20:06:00 UTC
[incubator-iceberg] branch master updated: Spark: Add support for
write-audit-publish workflows (#342)
This is an automated email from the ASF dual-hosted git repository.
blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new a7bed2b Spark: Add support for write-audit-publish workflows (#342)
a7bed2b is described below
commit a7bed2bfde9ca0a179790bddf7e145c9eb6dce71
Author: Ryan Blue <rd...@users.noreply.github.com>
AuthorDate: Thu Aug 1 13:05:56 2019 -0700
Spark: Add support for write-audit-publish workflows (#342)
---
.../main/java/org/apache/iceberg/SnapshotUpdate.java | 7 +++++++
.../java/org/apache/iceberg/SnapshotProducer.java | 15 ++++++++++++++-
.../main/java/org/apache/iceberg/TableMetadata.java | 10 ++++++++++
.../java/org/apache/iceberg/TableProperties.java | 3 +++
.../apache/iceberg/spark/source/IcebergSource.java | 3 ++-
.../java/org/apache/iceberg/spark/source/Writer.java | 20 ++++++++++++++++++++
6 files changed, 56 insertions(+), 2 deletions(-)
diff --git a/api/src/main/java/org/apache/iceberg/SnapshotUpdate.java b/api/src/main/java/org/apache/iceberg/SnapshotUpdate.java
index 5cabc02..fdd1a63 100644
--- a/api/src/main/java/org/apache/iceberg/SnapshotUpdate.java
+++ b/api/src/main/java/org/apache/iceberg/SnapshotUpdate.java
@@ -45,4 +45,11 @@ public interface SnapshotUpdate<ThisT> extends PendingUpdate<Snapshot> {
*/
ThisT deleteWith(Consumer<String> deleteFunc);
+ /**
+ * Called to stage a snapshot in table metadata, but not update the current snapshot id.
+ *
+ * @return this for method chaining
+ */
+ ThisT stageOnly();
+
}
diff --git a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java
index e8cc29b..83f15b5 100644
--- a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java
+++ b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java
@@ -79,6 +79,7 @@ abstract class SnapshotProducer<ThisT> implements SnapshotUpdate<ThisT> {
private final List<String> manifestLists = Lists.newArrayList();
private Long snapshotId = null;
private TableMetadata base = null;
+ private boolean stageOnly = false;
private Consumer<String> deleteFunc = defaultDelete;
protected SnapshotProducer(TableOperations ops) {
@@ -97,6 +98,12 @@ abstract class SnapshotProducer<ThisT> implements SnapshotUpdate<ThisT> {
protected abstract ThisT self();
@Override
+ public ThisT stageOnly() {
+ this.stageOnly = true;
+ return self();
+ }
+
+ @Override
public ThisT deleteWith(Consumer<String> deleteCallback) {
Preconditions.checkArgument(this.deleteFunc == defaultDelete, "Cannot set delete callback more than once");
this.deleteFunc = deleteCallback;
@@ -230,7 +237,13 @@ abstract class SnapshotProducer<ThisT> implements SnapshotUpdate<ThisT> {
.run(taskOps -> {
Snapshot newSnapshot = apply();
newSnapshotId.set(newSnapshot.snapshotId());
- TableMetadata updated = base.replaceCurrentSnapshot(newSnapshot);
+ TableMetadata updated;
+ if (stageOnly) {
+ updated = base.addStagedSnapshot(newSnapshot);
+ } else {
+ updated = base.replaceCurrentSnapshot(newSnapshot);
+ }
+
// if the table UUID is missing, add it here. the UUID will be re-created each time this operation retries
// to ensure that if a concurrent operation assigns the UUID, this operation will not fail.
taskOps.commit(base, updated.withUUID());
diff --git a/core/src/main/java/org/apache/iceberg/TableMetadata.java b/core/src/main/java/org/apache/iceberg/TableMetadata.java
index ef289b4..ea015ba 100644
--- a/core/src/main/java/org/apache/iceberg/TableMetadata.java
+++ b/core/src/main/java/org/apache/iceberg/TableMetadata.java
@@ -321,6 +321,16 @@ public class TableMetadata {
currentSnapshotId, snapshots, snapshotLog);
}
+ public TableMetadata addStagedSnapshot(Snapshot snapshot) {
+ List<Snapshot> newSnapshots = ImmutableList.<Snapshot>builder()
+ .addAll(snapshots)
+ .add(snapshot)
+ .build();
+ return new TableMetadata(ops, null, uuid, location,
+ snapshot.timestampMillis(), lastColumnId, schema, defaultSpecId, specs, properties,
+ currentSnapshotId, newSnapshots, snapshotLog);
+ }
+
public TableMetadata replaceCurrentSnapshot(Snapshot snapshot) {
List<Snapshot> newSnapshots = ImmutableList.<Snapshot>builder()
.addAll(snapshots)
diff --git a/core/src/main/java/org/apache/iceberg/TableProperties.java b/core/src/main/java/org/apache/iceberg/TableProperties.java
index 4dd778d..2ca8a25 100644
--- a/core/src/main/java/org/apache/iceberg/TableProperties.java
+++ b/core/src/main/java/org/apache/iceberg/TableProperties.java
@@ -93,4 +93,7 @@ public class TableProperties {
public static final String DEFAULT_WRITE_METRICS_MODE_DEFAULT = "truncate(16)";
public static final String DEFAULT_NAME_MAPPING = "schema.name-mapping.default";
+
+ public static final String WRITE_AUDIT_PUBLISH_ENABLED = "write.wap.enabled";
+ public static final String WRITE_AUDIT_PUBLISH_ENABLED_DEFAULT = "false";
}
diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java b/spark/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java
index b6dfade..3656787 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java
@@ -75,7 +75,8 @@ public class IcebergSource implements DataSourceV2, ReadSupport, WriteSupport, D
Table table = getTableAndResolveHadoopConfiguration(options, conf);
validateWriteSchema(table.schema(), dsStruct);
String appId = lazySparkSession().sparkContext().applicationId();
- return Optional.of(new Writer(table, options, mode == SaveMode.Overwrite, appId));
+ String wapId = lazySparkSession().conf().get("spark.wap.id", null);
+ return Optional.of(new Writer(table, options, mode == SaveMode.Overwrite, appId, wapId));
}
@Override
diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/Writer.java b/spark/src/main/java/org/apache/iceberg/spark/source/Writer.java
index aa9becc..f520505 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/source/Writer.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/source/Writer.java
@@ -45,6 +45,7 @@ import org.apache.iceberg.ReplacePartitions;
import org.apache.iceberg.Schema;
import org.apache.iceberg.SnapshotUpdate;
import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
import org.apache.iceberg.avro.Avro;
import org.apache.iceberg.encryption.EncryptedOutputFile;
import org.apache.iceberg.encryption.EncryptionManager;
@@ -87,14 +88,20 @@ class Writer implements DataSourceWriter {
private final EncryptionManager encryptionManager;
private final boolean replacePartitions;
private final String applicationId;
+ private final String wapId;
Writer(Table table, DataSourceOptions options, boolean replacePartitions, String applicationId) {
+ this(table, options, replacePartitions, applicationId, null);
+ }
+
+ Writer(Table table, DataSourceOptions options, boolean replacePartitions, String applicationId, String wapId) {
this.table = table;
this.format = getFileFormat(table.properties(), options);
this.fileIo = table.io();
this.encryptionManager = table.encryption();
this.replacePartitions = replacePartitions;
this.applicationId = applicationId;
+ this.wapId = wapId;
}
private FileFormat getFileFormat(Map<String, String> tableProperties, DataSourceOptions options) {
@@ -104,6 +111,11 @@ class Writer implements DataSourceWriter {
return FileFormat.valueOf(formatString.toUpperCase(Locale.ENGLISH));
}
+ private boolean isWapTable() {
+ return Boolean.parseBoolean(table.properties().getOrDefault(
+ TableProperties.WRITE_AUDIT_PUBLISH_ENABLED, TableProperties.WRITE_AUDIT_PUBLISH_ENABLED_DEFAULT));
+ }
+
@Override
public DataWriterFactory<InternalRow> createWriterFactory() {
return new WriterFactory(
@@ -124,6 +136,14 @@ class Writer implements DataSourceWriter {
if (applicationId != null) {
operation.set("spark.app.id", applicationId);
}
+
+ if (isWapTable() && wapId != null) {
+ // write-audit-publish is enabled for this table and job
+ // stage the changes without changing the current snapshot
+ operation.set("wap.id", wapId);
+ operation.stageOnly();
+ }
+
long start = System.currentTimeMillis();
operation.commit(); // abort is automatically called if this fails
long duration = System.currentTimeMillis() - start;