You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2022/06/06 22:48:21 UTC
[iceberg] branch master updated: Spark: Add CommitMetadata class to pass additional snapshot properties (#4956)
This is an automated email from the ASF dual-hosted git repository.
blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 0adf678b7 Spark: Add CommitMetadata class to pass additional snapshot properties (#4956)
0adf678b7 is described below
commit 0adf678b79ff97b4d1454bb762be3143404c10d2
Author: Nan Zhu <Co...@users.noreply.github.com>
AuthorDate: Mon Jun 6 15:48:15 2022 -0700
Spark: Add CommitMetadata class to pass additional snapshot properties (#4956)
This is needed because Spark cannot pass additional metadata for some operations.
---
.../org/apache/iceberg/spark/CommitMetadata.java | 61 ++++++++++++++++++++++
.../apache/iceberg/spark/source/SparkWrite.java | 5 ++
.../spark/source/TestDataSourceOptions.java | 41 +++++++++++++++
.../org/apache/iceberg/spark/CommitMetadata.java | 61 ++++++++++++++++++++++
.../apache/iceberg/spark/source/SparkWrite.java | 5 ++
.../spark/source/TestDataSourceOptions.java | 41 +++++++++++++++
.../org/apache/iceberg/spark/CommitMetadata.java | 61 ++++++++++++++++++++++
.../spark/source/SparkPositionDeltaWrite.java | 5 ++
.../apache/iceberg/spark/source/SparkWrite.java | 5 ++
.../spark/source/TestDataSourceOptions.java | 41 +++++++++++++++
10 files changed, 326 insertions(+)
diff --git a/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/CommitMetadata.java b/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/CommitMetadata.java
new file mode 100644
index 000000000..581372500
--- /dev/null
+++ b/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/CommitMetadata.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark;
+
+import java.util.Map;
+import java.util.concurrent.Callable;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.util.ExceptionUtil;
+
+/**
+ * utility class to accept thread local commit properties
+ */
+public class CommitMetadata {
+
+ private CommitMetadata() {
+
+ }
+
+ private static final ThreadLocal<Map<String, String>> COMMIT_PROPERTIES = ThreadLocal.withInitial(ImmutableMap::of);
+
+ /**
+ * running the code wrapped as a caller, and any snapshot committed within the callable object will be attached with
+ * the metadata defined in properties
+ * @param properties extra commit metadata to attach to the snapshot committed within callable
+ * @param callable the code to be executed
+ * @param exClass the expected type of exception which would be thrown from callable
+ */
+ public static <R, E extends Exception> R withCommitProperties(
+ Map<String, String> properties, Callable<R> callable, Class<E> exClass) throws E {
+ COMMIT_PROPERTIES.set(properties);
+ try {
+ return callable.call();
+ } catch (Throwable e) {
+ ExceptionUtil.castAndThrow(e, exClass);
+ return null;
+ } finally {
+ COMMIT_PROPERTIES.set(ImmutableMap.of());
+ }
+ }
+
+ public static Map<String, String> commitProperties() {
+ return COMMIT_PROPERTIES.get();
+ }
+}
diff --git a/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java b/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
index 80e6de96a..930f49ef6 100644
--- a/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
+++ b/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
@@ -57,6 +57,7 @@ import org.apache.iceberg.io.RollingDataWriter;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.spark.CommitMetadata;
import org.apache.iceberg.spark.FileRewriteCoordinator;
import org.apache.iceberg.spark.SparkWriteConf;
import org.apache.iceberg.util.PropertyUtil;
@@ -174,6 +175,10 @@ class SparkWrite {
extraSnapshotMetadata.forEach(operation::set);
}
+ if (!CommitMetadata.commitProperties().isEmpty()) {
+ CommitMetadata.commitProperties().forEach(operation::set);
+ }
+
if (isWapTable() && wapId != null) {
// write-audit-publish is enabled for this table and job
// stage the changes without changing the current snapshot
diff --git a/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java b/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java
index 55605288b..4fa8fdfc6 100644
--- a/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java
+++ b/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java
@@ -23,6 +23,7 @@ import java.io.IOException;
import java.math.RoundingMode;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.DataFile;
@@ -31,13 +32,16 @@ import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
+import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.hadoop.HadoopTables;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.relocated.com.google.common.math.LongMath;
+import org.apache.iceberg.spark.CommitMetadata;
import org.apache.iceberg.spark.SparkReadOptions;
import org.apache.iceberg.spark.SparkWriteOptions;
import org.apache.iceberg.types.Types;
@@ -404,4 +408,41 @@ public class TestDataSourceOptions {
Assert.assertTrue(table.currentSnapshot().summary().get("extra-key").equals("someValue"));
Assert.assertTrue(table.currentSnapshot().summary().get("another-key").equals("anotherValue"));
}
+
+ @Test
+ public void testExtraSnapshotMetadataWithSQL() throws InterruptedException, IOException {
+ String tableLocation = temp.newFolder("iceberg-table").toString();
+ HadoopTables tables = new HadoopTables(CONF);
+
+ Table table = tables.create(SCHEMA, PartitionSpec.unpartitioned(), Maps.newHashMap(), tableLocation);
+
+ List<SimpleRecord> expectedRecords = Lists.newArrayList(
+ new SimpleRecord(1, "a"),
+ new SimpleRecord(2, "b")
+ );
+ Dataset<Row> originalDf = spark.createDataFrame(expectedRecords, SimpleRecord.class);
+ originalDf.select("id", "data").write()
+ .format("iceberg")
+ .mode("append")
+ .save(tableLocation);
+ spark.read().format("iceberg").load(tableLocation).createOrReplaceTempView("target");
+ Thread writerThread = new Thread(() -> {
+ Map<String, String> properties = Maps.newHashMap();
+ properties.put("writer-thread", String.valueOf(Thread.currentThread().getName()));
+ CommitMetadata.withCommitProperties(properties, () -> {
+ spark.sql("INSERT INTO target VALUES (3, 'c'), (4, 'd')");
+ return 0;
+ }, RuntimeException.class);
+ });
+ writerThread.setName("test-extra-commit-message-writer-thread");
+ writerThread.start();
+ writerThread.join();
+ Set<String> threadNames = Sets.newHashSet();
+ for (Snapshot snapshot : table.snapshots()) {
+ threadNames.add(snapshot.summary().get("writer-thread"));
+ }
+ Assert.assertEquals(2, threadNames.size());
+ Assert.assertTrue(threadNames.contains(null));
+ Assert.assertTrue(threadNames.contains("test-extra-commit-message-writer-thread"));
+ }
}
diff --git a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/CommitMetadata.java b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/CommitMetadata.java
new file mode 100644
index 000000000..581372500
--- /dev/null
+++ b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/CommitMetadata.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark;
+
+import java.util.Map;
+import java.util.concurrent.Callable;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.util.ExceptionUtil;
+
+/**
+ * utility class to accept thread local commit properties
+ */
+public class CommitMetadata {
+
+ private CommitMetadata() {
+
+ }
+
+ private static final ThreadLocal<Map<String, String>> COMMIT_PROPERTIES = ThreadLocal.withInitial(ImmutableMap::of);
+
+ /**
+ * running the code wrapped as a caller, and any snapshot committed within the callable object will be attached with
+ * the metadata defined in properties
+ * @param properties extra commit metadata to attach to the snapshot committed within callable
+ * @param callable the code to be executed
+ * @param exClass the expected type of exception which would be thrown from callable
+ */
+ public static <R, E extends Exception> R withCommitProperties(
+ Map<String, String> properties, Callable<R> callable, Class<E> exClass) throws E {
+ COMMIT_PROPERTIES.set(properties);
+ try {
+ return callable.call();
+ } catch (Throwable e) {
+ ExceptionUtil.castAndThrow(e, exClass);
+ return null;
+ } finally {
+ COMMIT_PROPERTIES.set(ImmutableMap.of());
+ }
+ }
+
+ public static Map<String, String> commitProperties() {
+ return COMMIT_PROPERTIES.get();
+ }
+}
diff --git a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
index 80e6de96a..930f49ef6 100644
--- a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
+++ b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
@@ -57,6 +57,7 @@ import org.apache.iceberg.io.RollingDataWriter;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.spark.CommitMetadata;
import org.apache.iceberg.spark.FileRewriteCoordinator;
import org.apache.iceberg.spark.SparkWriteConf;
import org.apache.iceberg.util.PropertyUtil;
@@ -174,6 +175,10 @@ class SparkWrite {
extraSnapshotMetadata.forEach(operation::set);
}
+ if (!CommitMetadata.commitProperties().isEmpty()) {
+ CommitMetadata.commitProperties().forEach(operation::set);
+ }
+
if (isWapTable() && wapId != null) {
// write-audit-publish is enabled for this table and job
// stage the changes without changing the current snapshot
diff --git a/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java b/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java
index 55605288b..4fa8fdfc6 100644
--- a/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java
+++ b/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java
@@ -23,6 +23,7 @@ import java.io.IOException;
import java.math.RoundingMode;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.DataFile;
@@ -31,13 +32,16 @@ import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
+import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.hadoop.HadoopTables;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.relocated.com.google.common.math.LongMath;
+import org.apache.iceberg.spark.CommitMetadata;
import org.apache.iceberg.spark.SparkReadOptions;
import org.apache.iceberg.spark.SparkWriteOptions;
import org.apache.iceberg.types.Types;
@@ -404,4 +408,41 @@ public class TestDataSourceOptions {
Assert.assertTrue(table.currentSnapshot().summary().get("extra-key").equals("someValue"));
Assert.assertTrue(table.currentSnapshot().summary().get("another-key").equals("anotherValue"));
}
+
+ @Test
+ public void testExtraSnapshotMetadataWithSQL() throws InterruptedException, IOException {
+ String tableLocation = temp.newFolder("iceberg-table").toString();
+ HadoopTables tables = new HadoopTables(CONF);
+
+ Table table = tables.create(SCHEMA, PartitionSpec.unpartitioned(), Maps.newHashMap(), tableLocation);
+
+ List<SimpleRecord> expectedRecords = Lists.newArrayList(
+ new SimpleRecord(1, "a"),
+ new SimpleRecord(2, "b")
+ );
+ Dataset<Row> originalDf = spark.createDataFrame(expectedRecords, SimpleRecord.class);
+ originalDf.select("id", "data").write()
+ .format("iceberg")
+ .mode("append")
+ .save(tableLocation);
+ spark.read().format("iceberg").load(tableLocation).createOrReplaceTempView("target");
+ Thread writerThread = new Thread(() -> {
+ Map<String, String> properties = Maps.newHashMap();
+ properties.put("writer-thread", String.valueOf(Thread.currentThread().getName()));
+ CommitMetadata.withCommitProperties(properties, () -> {
+ spark.sql("INSERT INTO target VALUES (3, 'c'), (4, 'd')");
+ return 0;
+ }, RuntimeException.class);
+ });
+ writerThread.setName("test-extra-commit-message-writer-thread");
+ writerThread.start();
+ writerThread.join();
+ Set<String> threadNames = Sets.newHashSet();
+ for (Snapshot snapshot : table.snapshots()) {
+ threadNames.add(snapshot.summary().get("writer-thread"));
+ }
+ Assert.assertEquals(2, threadNames.size());
+ Assert.assertTrue(threadNames.contains(null));
+ Assert.assertTrue(threadNames.contains("test-extra-commit-message-writer-thread"));
+ }
}
diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/CommitMetadata.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/CommitMetadata.java
new file mode 100644
index 000000000..581372500
--- /dev/null
+++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/CommitMetadata.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark;
+
+import java.util.Map;
+import java.util.concurrent.Callable;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.util.ExceptionUtil;
+
+/**
+ * utility class to accept thread local commit properties
+ */
+public class CommitMetadata {
+
+ private CommitMetadata() {
+
+ }
+
+ private static final ThreadLocal<Map<String, String>> COMMIT_PROPERTIES = ThreadLocal.withInitial(ImmutableMap::of);
+
+ /**
+ * running the code wrapped as a caller, and any snapshot committed within the callable object will be attached with
+ * the metadata defined in properties
+ * @param properties extra commit metadata to attach to the snapshot committed within callable
+ * @param callable the code to be executed
+ * @param exClass the expected type of exception which would be thrown from callable
+ */
+ public static <R, E extends Exception> R withCommitProperties(
+ Map<String, String> properties, Callable<R> callable, Class<E> exClass) throws E {
+ COMMIT_PROPERTIES.set(properties);
+ try {
+ return callable.call();
+ } catch (Throwable e) {
+ ExceptionUtil.castAndThrow(e, exClass);
+ return null;
+ } finally {
+ COMMIT_PROPERTIES.set(ImmutableMap.of());
+ }
+ }
+
+ public static Map<String, String> commitProperties() {
+ return COMMIT_PROPERTIES.get();
+ }
+}
diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java
index 7c5b80afb..846e0735e 100644
--- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java
+++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java
@@ -54,6 +54,7 @@ import org.apache.iceberg.io.PartitioningWriter;
import org.apache.iceberg.io.PositionDeltaWriter;
import org.apache.iceberg.io.WriteResult;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.spark.CommitMetadata;
import org.apache.iceberg.spark.SparkSchemaUtil;
import org.apache.iceberg.spark.SparkWriteConf;
import org.apache.iceberg.types.Types;
@@ -249,6 +250,10 @@ class SparkPositionDeltaWrite implements DeltaWrite, RequiresDistributionAndOrde
extraSnapshotMetadata.forEach(operation::set);
+ if (!CommitMetadata.commitProperties().isEmpty()) {
+ CommitMetadata.commitProperties().forEach(operation::set);
+ }
+
if (wapEnabled && wapId != null) {
// write-audit-publish is enabled for this table and job
// stage the changes without changing the current snapshot
diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
index e919a83ab..b714b8055 100644
--- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
+++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
@@ -56,6 +56,7 @@ import org.apache.iceberg.io.RollingDataWriter;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.spark.CommitMetadata;
import org.apache.iceberg.spark.FileRewriteCoordinator;
import org.apache.iceberg.spark.SparkWriteConf;
import org.apache.iceberg.util.PropertyUtil;
@@ -192,6 +193,10 @@ abstract class SparkWrite implements Write, RequiresDistributionAndOrdering {
extraSnapshotMetadata.forEach(operation::set);
}
+ if (!CommitMetadata.commitProperties().isEmpty()) {
+ CommitMetadata.commitProperties().forEach(operation::set);
+ }
+
if (wapEnabled && wapId != null) {
// write-audit-publish is enabled for this table and job
// stage the changes without changing the current snapshot
diff --git a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java
index 091696d58..00a11c55a 100644
--- a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java
+++ b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java
@@ -23,6 +23,7 @@ import java.io.IOException;
import java.math.RoundingMode;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.DataFile;
@@ -31,13 +32,16 @@ import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
+import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.hadoop.HadoopTables;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.relocated.com.google.common.math.LongMath;
+import org.apache.iceberg.spark.CommitMetadata;
import org.apache.iceberg.spark.SparkReadOptions;
import org.apache.iceberg.spark.SparkWriteOptions;
import org.apache.iceberg.types.Types;
@@ -404,4 +408,41 @@ public class TestDataSourceOptions {
Assert.assertTrue(table.currentSnapshot().summary().get("extra-key").equals("someValue"));
Assert.assertTrue(table.currentSnapshot().summary().get("another-key").equals("anotherValue"));
}
+
+ @Test
+ public void testExtraSnapshotMetadataWithSQL() throws InterruptedException, IOException {
+ String tableLocation = temp.newFolder("iceberg-table").toString();
+ HadoopTables tables = new HadoopTables(CONF);
+
+ Table table = tables.create(SCHEMA, PartitionSpec.unpartitioned(), Maps.newHashMap(), tableLocation);
+
+ List<SimpleRecord> expectedRecords = Lists.newArrayList(
+ new SimpleRecord(1, "a"),
+ new SimpleRecord(2, "b")
+ );
+ Dataset<Row> originalDf = spark.createDataFrame(expectedRecords, SimpleRecord.class);
+ originalDf.select("id", "data").write()
+ .format("iceberg")
+ .mode("append")
+ .save(tableLocation);
+ spark.read().format("iceberg").load(tableLocation).createOrReplaceTempView("target");
+ Thread writerThread = new Thread(() -> {
+ Map<String, String> properties = Maps.newHashMap();
+ properties.put("writer-thread", String.valueOf(Thread.currentThread().getName()));
+ CommitMetadata.withCommitProperties(properties, () -> {
+ spark.sql("INSERT INTO target VALUES (3, 'c'), (4, 'd')");
+ return 0;
+ }, RuntimeException.class);
+ });
+ writerThread.setName("test-extra-commit-message-writer-thread");
+ writerThread.start();
+ writerThread.join();
+ Set<String> threadNames = Sets.newHashSet();
+ for (Snapshot snapshot : table.snapshots()) {
+ threadNames.add(snapshot.summary().get("writer-thread"));
+ }
+ Assert.assertEquals(2, threadNames.size());
+ Assert.assertTrue(threadNames.contains(null));
+ Assert.assertTrue(threadNames.contains("test-extra-commit-message-writer-thread"));
+ }
}