You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2022/06/06 22:48:21 UTC

[iceberg] branch master updated: Spark: Add CommitMetadata class to pass additional snapshot properties (#4956)

This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 0adf678b7 Spark: Add CommitMetadata class to pass additional snapshot properties (#4956)
0adf678b7 is described below

commit 0adf678b79ff97b4d1454bb762be3143404c10d2
Author: Nan Zhu <Co...@users.noreply.github.com>
AuthorDate: Mon Jun 6 15:48:15 2022 -0700

    Spark: Add CommitMetadata class to pass additional snapshot properties (#4956)
    
    This is needed because Spark cannot pass additional metadata for some operations.
---
 .../org/apache/iceberg/spark/CommitMetadata.java   | 61 ++++++++++++++++++++++
 .../apache/iceberg/spark/source/SparkWrite.java    |  5 ++
 .../spark/source/TestDataSourceOptions.java        | 41 +++++++++++++++
 .../org/apache/iceberg/spark/CommitMetadata.java   | 61 ++++++++++++++++++++++
 .../apache/iceberg/spark/source/SparkWrite.java    |  5 ++
 .../spark/source/TestDataSourceOptions.java        | 41 +++++++++++++++
 .../org/apache/iceberg/spark/CommitMetadata.java   | 61 ++++++++++++++++++++++
 .../spark/source/SparkPositionDeltaWrite.java      |  5 ++
 .../apache/iceberg/spark/source/SparkWrite.java    |  5 ++
 .../spark/source/TestDataSourceOptions.java        | 41 +++++++++++++++
 10 files changed, 326 insertions(+)

diff --git a/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/CommitMetadata.java b/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/CommitMetadata.java
new file mode 100644
index 000000000..581372500
--- /dev/null
+++ b/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/CommitMetadata.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark;
+
+import java.util.Map;
+import java.util.concurrent.Callable;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.util.ExceptionUtil;
+
+/**
+ * utility class to accept thread local commit properties
+ */
+public class CommitMetadata {
+
+  private CommitMetadata() {
+
+  }
+
+  private static final ThreadLocal<Map<String, String>> COMMIT_PROPERTIES = ThreadLocal.withInitial(ImmutableMap::of);
+
+  /**
+   * running the code wrapped as a caller, and any snapshot committed within the callable object will be attached with
+   * the metadata defined in properties
+   * @param properties extra commit metadata to attach to the snapshot committed within callable
+   * @param callable the code to be executed
+   * @param exClass the expected type of exception which would be thrown from callable
+   */
+  public static <R, E extends Exception> R withCommitProperties(
+      Map<String, String> properties, Callable<R> callable, Class<E> exClass) throws E {
+    COMMIT_PROPERTIES.set(properties);
+    try {
+      return callable.call();
+    } catch (Throwable e) {
+      ExceptionUtil.castAndThrow(e, exClass);
+      return null;
+    } finally {
+      COMMIT_PROPERTIES.set(ImmutableMap.of());
+    }
+  }
+
+  public static Map<String, String> commitProperties() {
+    return COMMIT_PROPERTIES.get();
+  }
+}
diff --git a/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java b/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
index 80e6de96a..930f49ef6 100644
--- a/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
+++ b/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
@@ -57,6 +57,7 @@ import org.apache.iceberg.io.RollingDataWriter;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
 import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
 import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.spark.CommitMetadata;
 import org.apache.iceberg.spark.FileRewriteCoordinator;
 import org.apache.iceberg.spark.SparkWriteConf;
 import org.apache.iceberg.util.PropertyUtil;
@@ -174,6 +175,10 @@ class SparkWrite {
       extraSnapshotMetadata.forEach(operation::set);
     }
 
+    if (!CommitMetadata.commitProperties().isEmpty()) {
+      CommitMetadata.commitProperties().forEach(operation::set);
+    }
+
     if (isWapTable() && wapId != null) {
       // write-audit-publish is enabled for this table and job
       // stage the changes without changing the current snapshot
diff --git a/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java b/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java
index 55605288b..4fa8fdfc6 100644
--- a/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java
+++ b/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java
@@ -23,6 +23,7 @@ import java.io.IOException;
 import java.math.RoundingMode;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.iceberg.AssertHelpers;
 import org.apache.iceberg.DataFile;
@@ -31,13 +32,16 @@ import org.apache.iceberg.FileScanTask;
 import org.apache.iceberg.ManifestFile;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Schema;
+import org.apache.iceberg.Snapshot;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.TableProperties;
 import org.apache.iceberg.hadoop.HadoopTables;
 import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
 import org.apache.iceberg.relocated.com.google.common.math.LongMath;
+import org.apache.iceberg.spark.CommitMetadata;
 import org.apache.iceberg.spark.SparkReadOptions;
 import org.apache.iceberg.spark.SparkWriteOptions;
 import org.apache.iceberg.types.Types;
@@ -404,4 +408,41 @@ public class TestDataSourceOptions {
     Assert.assertTrue(table.currentSnapshot().summary().get("extra-key").equals("someValue"));
     Assert.assertTrue(table.currentSnapshot().summary().get("another-key").equals("anotherValue"));
   }
+
+  @Test
+  public void testExtraSnapshotMetadataWithSQL() throws InterruptedException, IOException {
+    String tableLocation = temp.newFolder("iceberg-table").toString();
+    HadoopTables tables = new HadoopTables(CONF);
+
+    Table table = tables.create(SCHEMA, PartitionSpec.unpartitioned(), Maps.newHashMap(), tableLocation);
+
+    List<SimpleRecord> expectedRecords = Lists.newArrayList(
+        new SimpleRecord(1, "a"),
+        new SimpleRecord(2, "b")
+    );
+    Dataset<Row> originalDf = spark.createDataFrame(expectedRecords, SimpleRecord.class);
+    originalDf.select("id", "data").write()
+        .format("iceberg")
+        .mode("append")
+        .save(tableLocation);
+    spark.read().format("iceberg").load(tableLocation).createOrReplaceTempView("target");
+    Thread writerThread = new Thread(() -> {
+      Map<String, String> properties = Maps.newHashMap();
+      properties.put("writer-thread", String.valueOf(Thread.currentThread().getName()));
+      CommitMetadata.withCommitProperties(properties, () -> {
+        spark.sql("INSERT INTO target VALUES (3, 'c'), (4, 'd')");
+        return 0;
+      }, RuntimeException.class);
+    });
+    writerThread.setName("test-extra-commit-message-writer-thread");
+    writerThread.start();
+    writerThread.join();
+    Set<String> threadNames = Sets.newHashSet();
+    for (Snapshot snapshot : table.snapshots()) {
+      threadNames.add(snapshot.summary().get("writer-thread"));
+    }
+    Assert.assertEquals(2, threadNames.size());
+    Assert.assertTrue(threadNames.contains(null));
+    Assert.assertTrue(threadNames.contains("test-extra-commit-message-writer-thread"));
+  }
 }
diff --git a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/CommitMetadata.java b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/CommitMetadata.java
new file mode 100644
index 000000000..581372500
--- /dev/null
+++ b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/CommitMetadata.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark;
+
+import java.util.Map;
+import java.util.concurrent.Callable;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.util.ExceptionUtil;
+
+/**
+ * utility class to accept thread local commit properties
+ */
+public class CommitMetadata {
+
+  private CommitMetadata() {
+
+  }
+
+  private static final ThreadLocal<Map<String, String>> COMMIT_PROPERTIES = ThreadLocal.withInitial(ImmutableMap::of);
+
+  /**
+   * running the code wrapped as a caller, and any snapshot committed within the callable object will be attached with
+   * the metadata defined in properties
+   * @param properties extra commit metadata to attach to the snapshot committed within callable
+   * @param callable the code to be executed
+   * @param exClass the expected type of exception which would be thrown from callable
+   */
+  public static <R, E extends Exception> R withCommitProperties(
+      Map<String, String> properties, Callable<R> callable, Class<E> exClass) throws E {
+    COMMIT_PROPERTIES.set(properties);
+    try {
+      return callable.call();
+    } catch (Throwable e) {
+      ExceptionUtil.castAndThrow(e, exClass);
+      return null;
+    } finally {
+      COMMIT_PROPERTIES.set(ImmutableMap.of());
+    }
+  }
+
+  public static Map<String, String> commitProperties() {
+    return COMMIT_PROPERTIES.get();
+  }
+}
diff --git a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
index 80e6de96a..930f49ef6 100644
--- a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
+++ b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
@@ -57,6 +57,7 @@ import org.apache.iceberg.io.RollingDataWriter;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
 import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
 import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.spark.CommitMetadata;
 import org.apache.iceberg.spark.FileRewriteCoordinator;
 import org.apache.iceberg.spark.SparkWriteConf;
 import org.apache.iceberg.util.PropertyUtil;
@@ -174,6 +175,10 @@ class SparkWrite {
       extraSnapshotMetadata.forEach(operation::set);
     }
 
+    if (!CommitMetadata.commitProperties().isEmpty()) {
+      CommitMetadata.commitProperties().forEach(operation::set);
+    }
+
     if (isWapTable() && wapId != null) {
       // write-audit-publish is enabled for this table and job
       // stage the changes without changing the current snapshot
diff --git a/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java b/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java
index 55605288b..4fa8fdfc6 100644
--- a/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java
+++ b/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java
@@ -23,6 +23,7 @@ import java.io.IOException;
 import java.math.RoundingMode;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.iceberg.AssertHelpers;
 import org.apache.iceberg.DataFile;
@@ -31,13 +32,16 @@ import org.apache.iceberg.FileScanTask;
 import org.apache.iceberg.ManifestFile;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Schema;
+import org.apache.iceberg.Snapshot;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.TableProperties;
 import org.apache.iceberg.hadoop.HadoopTables;
 import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
 import org.apache.iceberg.relocated.com.google.common.math.LongMath;
+import org.apache.iceberg.spark.CommitMetadata;
 import org.apache.iceberg.spark.SparkReadOptions;
 import org.apache.iceberg.spark.SparkWriteOptions;
 import org.apache.iceberg.types.Types;
@@ -404,4 +408,41 @@ public class TestDataSourceOptions {
     Assert.assertTrue(table.currentSnapshot().summary().get("extra-key").equals("someValue"));
     Assert.assertTrue(table.currentSnapshot().summary().get("another-key").equals("anotherValue"));
   }
+
+  @Test
+  public void testExtraSnapshotMetadataWithSQL() throws InterruptedException, IOException {
+    String tableLocation = temp.newFolder("iceberg-table").toString();
+    HadoopTables tables = new HadoopTables(CONF);
+
+    Table table = tables.create(SCHEMA, PartitionSpec.unpartitioned(), Maps.newHashMap(), tableLocation);
+
+    List<SimpleRecord> expectedRecords = Lists.newArrayList(
+        new SimpleRecord(1, "a"),
+        new SimpleRecord(2, "b")
+    );
+    Dataset<Row> originalDf = spark.createDataFrame(expectedRecords, SimpleRecord.class);
+    originalDf.select("id", "data").write()
+        .format("iceberg")
+        .mode("append")
+        .save(tableLocation);
+    spark.read().format("iceberg").load(tableLocation).createOrReplaceTempView("target");
+    Thread writerThread = new Thread(() -> {
+      Map<String, String> properties = Maps.newHashMap();
+      properties.put("writer-thread", String.valueOf(Thread.currentThread().getName()));
+      CommitMetadata.withCommitProperties(properties, () -> {
+        spark.sql("INSERT INTO target VALUES (3, 'c'), (4, 'd')");
+        return 0;
+      }, RuntimeException.class);
+    });
+    writerThread.setName("test-extra-commit-message-writer-thread");
+    writerThread.start();
+    writerThread.join();
+    Set<String> threadNames = Sets.newHashSet();
+    for (Snapshot snapshot : table.snapshots()) {
+      threadNames.add(snapshot.summary().get("writer-thread"));
+    }
+    Assert.assertEquals(2, threadNames.size());
+    Assert.assertTrue(threadNames.contains(null));
+    Assert.assertTrue(threadNames.contains("test-extra-commit-message-writer-thread"));
+  }
 }
diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/CommitMetadata.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/CommitMetadata.java
new file mode 100644
index 000000000..581372500
--- /dev/null
+++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/CommitMetadata.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark;
+
+import java.util.Map;
+import java.util.concurrent.Callable;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.util.ExceptionUtil;
+
+/**
+ * utility class to accept thread local commit properties
+ */
+public class CommitMetadata {
+
+  private CommitMetadata() {
+
+  }
+
+  private static final ThreadLocal<Map<String, String>> COMMIT_PROPERTIES = ThreadLocal.withInitial(ImmutableMap::of);
+
+  /**
+   * running the code wrapped as a caller, and any snapshot committed within the callable object will be attached with
+   * the metadata defined in properties
+   * @param properties extra commit metadata to attach to the snapshot committed within callable
+   * @param callable the code to be executed
+   * @param exClass the expected type of exception which would be thrown from callable
+   */
+  public static <R, E extends Exception> R withCommitProperties(
+      Map<String, String> properties, Callable<R> callable, Class<E> exClass) throws E {
+    COMMIT_PROPERTIES.set(properties);
+    try {
+      return callable.call();
+    } catch (Throwable e) {
+      ExceptionUtil.castAndThrow(e, exClass);
+      return null;
+    } finally {
+      COMMIT_PROPERTIES.set(ImmutableMap.of());
+    }
+  }
+
+  public static Map<String, String> commitProperties() {
+    return COMMIT_PROPERTIES.get();
+  }
+}
diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java
index 7c5b80afb..846e0735e 100644
--- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java
+++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java
@@ -54,6 +54,7 @@ import org.apache.iceberg.io.PartitioningWriter;
 import org.apache.iceberg.io.PositionDeltaWriter;
 import org.apache.iceberg.io.WriteResult;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.spark.CommitMetadata;
 import org.apache.iceberg.spark.SparkSchemaUtil;
 import org.apache.iceberg.spark.SparkWriteConf;
 import org.apache.iceberg.types.Types;
@@ -249,6 +250,10 @@ class SparkPositionDeltaWrite implements DeltaWrite, RequiresDistributionAndOrde
 
       extraSnapshotMetadata.forEach(operation::set);
 
+      if (!CommitMetadata.commitProperties().isEmpty()) {
+        CommitMetadata.commitProperties().forEach(operation::set);
+      }
+
       if (wapEnabled && wapId != null) {
         // write-audit-publish is enabled for this table and job
         // stage the changes without changing the current snapshot
diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
index e919a83ab..b714b8055 100644
--- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
+++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
@@ -56,6 +56,7 @@ import org.apache.iceberg.io.RollingDataWriter;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
 import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
 import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.spark.CommitMetadata;
 import org.apache.iceberg.spark.FileRewriteCoordinator;
 import org.apache.iceberg.spark.SparkWriteConf;
 import org.apache.iceberg.util.PropertyUtil;
@@ -192,6 +193,10 @@ abstract class SparkWrite implements Write, RequiresDistributionAndOrdering {
       extraSnapshotMetadata.forEach(operation::set);
     }
 
+    if (!CommitMetadata.commitProperties().isEmpty()) {
+      CommitMetadata.commitProperties().forEach(operation::set);
+    }
+
     if (wapEnabled && wapId != null) {
       // write-audit-publish is enabled for this table and job
       // stage the changes without changing the current snapshot
diff --git a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java
index 091696d58..00a11c55a 100644
--- a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java
+++ b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java
@@ -23,6 +23,7 @@ import java.io.IOException;
 import java.math.RoundingMode;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.iceberg.AssertHelpers;
 import org.apache.iceberg.DataFile;
@@ -31,13 +32,16 @@ import org.apache.iceberg.FileScanTask;
 import org.apache.iceberg.ManifestFile;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Schema;
+import org.apache.iceberg.Snapshot;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.TableProperties;
 import org.apache.iceberg.hadoop.HadoopTables;
 import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
 import org.apache.iceberg.relocated.com.google.common.math.LongMath;
+import org.apache.iceberg.spark.CommitMetadata;
 import org.apache.iceberg.spark.SparkReadOptions;
 import org.apache.iceberg.spark.SparkWriteOptions;
 import org.apache.iceberg.types.Types;
@@ -404,4 +408,41 @@ public class TestDataSourceOptions {
     Assert.assertTrue(table.currentSnapshot().summary().get("extra-key").equals("someValue"));
     Assert.assertTrue(table.currentSnapshot().summary().get("another-key").equals("anotherValue"));
   }
+
+  @Test
+  public void testExtraSnapshotMetadataWithSQL() throws InterruptedException, IOException {
+    String tableLocation = temp.newFolder("iceberg-table").toString();
+    HadoopTables tables = new HadoopTables(CONF);
+
+    Table table = tables.create(SCHEMA, PartitionSpec.unpartitioned(), Maps.newHashMap(), tableLocation);
+
+    List<SimpleRecord> expectedRecords = Lists.newArrayList(
+        new SimpleRecord(1, "a"),
+        new SimpleRecord(2, "b")
+    );
+    Dataset<Row> originalDf = spark.createDataFrame(expectedRecords, SimpleRecord.class);
+    originalDf.select("id", "data").write()
+        .format("iceberg")
+        .mode("append")
+        .save(tableLocation);
+    spark.read().format("iceberg").load(tableLocation).createOrReplaceTempView("target");
+    Thread writerThread = new Thread(() -> {
+      Map<String, String> properties = Maps.newHashMap();
+      properties.put("writer-thread", String.valueOf(Thread.currentThread().getName()));
+      CommitMetadata.withCommitProperties(properties, () -> {
+        spark.sql("INSERT INTO target VALUES (3, 'c'), (4, 'd')");
+        return 0;
+      }, RuntimeException.class);
+    });
+    writerThread.setName("test-extra-commit-message-writer-thread");
+    writerThread.start();
+    writerThread.join();
+    Set<String> threadNames = Sets.newHashSet();
+    for (Snapshot snapshot : table.snapshots()) {
+      threadNames.add(snapshot.summary().get("writer-thread"));
+    }
+    Assert.assertEquals(2, threadNames.size());
+    Assert.assertTrue(threadNames.contains(null));
+    Assert.assertTrue(threadNames.contains("test-extra-commit-message-writer-thread"));
+  }
 }