You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2022/06/03 15:58:06 UTC

[GitHub] [iceberg] CodingCat opened a new pull request, #4956: support adding extra commit metadata with SQL in Spark

CodingCat opened a new pull request, #4956:
URL: https://github.com/apache/iceberg/pull/4956

   this PR implements the functionality for users to add extra commit metadata when operating tables with SQL. It also allows users to use multi threading to commit data to tables while keeping metadata thread local 
   
   new usage:
   
   ```scala
   (0 until 10).foreach { _ =>
   
   
       new Thread() {
         override def run() {
           CommitMetadata.withCommitProperties(Map("metadata-key" -> "thread-local-metadata-value").asJava,
             () => {
               SparkSession.getActiveSession.get.sql("INSERT INTO target VALUES (3, 'c'), (4, 'd')");
             })
         }
       }.start()
   
     }
   }
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] kbendick commented on a diff in pull request #4956: support adding extra commit metadata with SQL in Spark

Posted by GitBox <gi...@apache.org>.
kbendick commented on code in PR #4956:
URL: https://github.com/apache/iceberg/pull/4956#discussion_r889196775


##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/CommitMetadata.java:
##########
@@ -0,0 +1,67 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.iceberg.spark;
+
+import java.util.Map;
+import java.util.concurrent.Callable;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+
+/**
+ * utility class to accept thread local commit properties
+ */
+public class CommitMetadata {

Review Comment:
   Nit: The naming of this might be somewhat confusing / cause import conflicts with spark-sql's `CommitMetadata` case class for streaming commit metadata.
   
   It would be nice to find a way to reuse `extraSnapshotMetadata` function (or a similarly named function) if possible. Maybe an explicit `extraThreadLocalSnapshotMetadata` function or class would be more clear? Seeing `CommitMetadata`, I in no way expect any of its state to be thread local.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] CodingCat commented on pull request #4956: support adding extra commit metadata with SQL in Spark

Posted by GitBox <gi...@apache.org>.
CodingCat commented on PR #4956:
URL: https://github.com/apache/iceberg/pull/4956#issuecomment-1146980546

   > What was wrong with the `CommitMetadata` name? I think we should have a simpler name than `CallerWithCommitMetadata`. That's a bit too confusing. I think the original name was good.
   
   https://github.com/apache/iceberg/pull/4956#discussion_r889199722 @kbendick mentioned some potential conflict with SS's CommitMetadata here, I don't have strong opinion on this


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a diff in pull request #4956: support adding extra commit metadata with SQL in Spark

Posted by GitBox <gi...@apache.org>.
rdblue commented on code in PR #4956:
URL: https://github.com/apache/iceberg/pull/4956#discussion_r889095947


##########
spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java:
##########
@@ -404,4 +414,61 @@ public void testExtraSnapshotMetadata() throws IOException {
     Assert.assertTrue(table.currentSnapshot().summary().get("extra-key").equals("someValue"));
     Assert.assertTrue(table.currentSnapshot().summary().get("another-key").equals("anotherValue"));
   }
+
+  @Test
+  public void testExtraSnapshotMetadataWithSQL() throws IOException {
+    String tableLocation = temp.newFolder("iceberg-table").toString();
+    HadoopTables tables = new HadoopTables(CONF);
+    int threadsCount = 3;
+    ExecutorService executorService = Executors.newFixedThreadPool(threadsCount, new ThreadFactory() {
+
+      private AtomicInteger currentThreadCount = new AtomicInteger(0);
+
+      @Override
+      public Thread newThread(Runnable r) {
+        return new Thread(r, "thread-" + currentThreadCount.getAndIncrement());
+      }
+    });
+
+    Table table = tables.create(SCHEMA, PartitionSpec.unpartitioned(), Maps.newHashMap(), tableLocation);
+
+    List<SimpleRecord> expectedRecords = Lists.newArrayList(
+        new SimpleRecord(1, "a"),
+        new SimpleRecord(2, "b")
+    );
+    Dataset<Row> originalDf = spark.createDataFrame(expectedRecords, SimpleRecord.class);
+    originalDf.select("id", "data").write()
+        .format("iceberg")
+        .mode("append")
+        .option(SparkWriteOptions.SNAPSHOT_PROPERTY_PREFIX + ".extra-key", "someValue")
+        .option(SparkWriteOptions.SNAPSHOT_PROPERTY_PREFIX + ".another-key", "anotherValue")
+        .save(tableLocation);
+    spark.read().format("iceberg").load(tableLocation).createOrReplaceTempView("target");
+    Tasks
+        .range(threadsCount)
+        .stopOnFailure()
+        .throwFailureWhenFinished()
+        .executeWith(executorService)
+        .run(index -> {
+              Map<String, String> properties = Maps.newHashMap();
+              properties.put("writer-thread", String.valueOf(Thread.currentThread().getName()));
+              try {
+                CommitMetadata.withCommitProperties(properties, () -> {
+                  spark.sql("INSERT INTO target VALUES (3, 'c'), (4, 'd')");
+                  return 0;
+                });
+              } catch (Exception e) {
+                e.printStackTrace();
+              }
+            }
+        );
+    Set<String> threadNames = new HashSet<>();

Review Comment:
   Use `Sets.newHashSet()` from Guava.



##########
spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java:
##########
@@ -404,4 +414,61 @@ public void testExtraSnapshotMetadata() throws IOException {
     Assert.assertTrue(table.currentSnapshot().summary().get("extra-key").equals("someValue"));
     Assert.assertTrue(table.currentSnapshot().summary().get("another-key").equals("anotherValue"));
   }
+
+  @Test
+  public void testExtraSnapshotMetadataWithSQL() throws IOException {
+    String tableLocation = temp.newFolder("iceberg-table").toString();
+    HadoopTables tables = new HadoopTables(CONF);
+    int threadsCount = 3;
+    ExecutorService executorService = Executors.newFixedThreadPool(threadsCount, new ThreadFactory() {
+
+      private AtomicInteger currentThreadCount = new AtomicInteger(0);
+
+      @Override
+      public Thread newThread(Runnable r) {
+        return new Thread(r, "thread-" + currentThreadCount.getAndIncrement());
+      }
+    });
+
+    Table table = tables.create(SCHEMA, PartitionSpec.unpartitioned(), Maps.newHashMap(), tableLocation);
+
+    List<SimpleRecord> expectedRecords = Lists.newArrayList(
+        new SimpleRecord(1, "a"),
+        new SimpleRecord(2, "b")
+    );
+    Dataset<Row> originalDf = spark.createDataFrame(expectedRecords, SimpleRecord.class);
+    originalDf.select("id", "data").write()
+        .format("iceberg")
+        .mode("append")
+        .option(SparkWriteOptions.SNAPSHOT_PROPERTY_PREFIX + ".extra-key", "someValue")
+        .option(SparkWriteOptions.SNAPSHOT_PROPERTY_PREFIX + ".another-key", "anotherValue")
+        .save(tableLocation);
+    spark.read().format("iceberg").load(tableLocation).createOrReplaceTempView("target");
+    Tasks
+        .range(threadsCount)
+        .stopOnFailure()
+        .throwFailureWhenFinished()
+        .executeWith(executorService)
+        .run(index -> {
+              Map<String, String> properties = Maps.newHashMap();
+              properties.put("writer-thread", String.valueOf(Thread.currentThread().getName()));
+              try {
+                CommitMetadata.withCommitProperties(properties, () -> {
+                  spark.sql("INSERT INTO target VALUES (3, 'c'), (4, 'd')");
+                  return 0;
+                });
+              } catch (Exception e) {
+                e.printStackTrace();
+              }
+            }
+        );
+    Set<String> threadNames = new HashSet<>();
+    for (Snapshot snapshot : table.snapshots()) {
+      threadNames.add(snapshot.summary().get("writer-thread"));
+    }
+    assertTrue(threadNames.contains("thread-0"));
+    assertTrue(threadNames.contains("thread-1"));
+    assertTrue(threadNames.contains("thread-2"));
+

Review Comment:
   Unnecessary newline.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a diff in pull request #4956: support adding extra commit metadata with SQL in Spark

Posted by GitBox <gi...@apache.org>.
rdblue commented on code in PR #4956:
URL: https://github.com/apache/iceberg/pull/4956#discussion_r889099407


##########
spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java:
##########
@@ -404,4 +414,61 @@ public void testExtraSnapshotMetadata() throws IOException {
     Assert.assertTrue(table.currentSnapshot().summary().get("extra-key").equals("someValue"));
     Assert.assertTrue(table.currentSnapshot().summary().get("another-key").equals("anotherValue"));
   }
+
+  @Test
+  public void testExtraSnapshotMetadataWithSQL() throws IOException {
+    String tableLocation = temp.newFolder("iceberg-table").toString();
+    HadoopTables tables = new HadoopTables(CONF);
+    int threadsCount = 3;
+    ExecutorService executorService = Executors.newFixedThreadPool(threadsCount, new ThreadFactory() {
+
+      private AtomicInteger currentThreadCount = new AtomicInteger(0);
+
+      @Override
+      public Thread newThread(Runnable r) {
+        return new Thread(r, "thread-" + currentThreadCount.getAndIncrement());
+      }
+    });
+
+    Table table = tables.create(SCHEMA, PartitionSpec.unpartitioned(), Maps.newHashMap(), tableLocation);
+
+    List<SimpleRecord> expectedRecords = Lists.newArrayList(
+        new SimpleRecord(1, "a"),
+        new SimpleRecord(2, "b")
+    );
+    Dataset<Row> originalDf = spark.createDataFrame(expectedRecords, SimpleRecord.class);
+    originalDf.select("id", "data").write()
+        .format("iceberg")
+        .mode("append")
+        .option(SparkWriteOptions.SNAPSHOT_PROPERTY_PREFIX + ".extra-key", "someValue")
+        .option(SparkWriteOptions.SNAPSHOT_PROPERTY_PREFIX + ".another-key", "anotherValue")
+        .save(tableLocation);
+    spark.read().format("iceberg").load(tableLocation).createOrReplaceTempView("target");
+    Tasks
+        .range(threadsCount)
+        .stopOnFailure()
+        .throwFailureWhenFinished()
+        .executeWith(executorService)
+        .run(index -> {
+              Map<String, String> properties = Maps.newHashMap();
+              properties.put("writer-thread", String.valueOf(Thread.currentThread().getName()));
+              try {
+                CommitMetadata.withCommitProperties(properties, () -> {
+                  spark.sql("INSERT INTO target VALUES (3, 'c'), (4, 'd')");
+                  return 0;
+                });
+              } catch (Exception e) {
+                e.printStackTrace();
+              }
+            }
+        );
+    Set<String> threadNames = new HashSet<>();
+    for (Snapshot snapshot : table.snapshots()) {
+      threadNames.add(snapshot.summary().get("writer-thread"));
+    }
+    assertTrue(threadNames.contains("thread-0"));

Review Comment:
   Add empty newlines between a control flow blocks and the following statement.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] CodingCat commented on pull request #4956: support adding extra commit metadata with SQL in Spark

Posted by GitBox <gi...@apache.org>.
CodingCat commented on PR #4956:
URL: https://github.com/apache/iceberg/pull/4956#issuecomment-1147975994

   @kbendick @rdblue @singhpk234 updated the PR accordingly, thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] CodingCat commented on a diff in pull request #4956: support adding extra commit metadata with SQL in Spark

Posted by GitBox <gi...@apache.org>.
CodingCat commented on code in PR #4956:
URL: https://github.com/apache/iceberg/pull/4956#discussion_r889486763


##########
spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java:
##########
@@ -404,4 +414,61 @@ public void testExtraSnapshotMetadata() throws IOException {
     Assert.assertTrue(table.currentSnapshot().summary().get("extra-key").equals("someValue"));
     Assert.assertTrue(table.currentSnapshot().summary().get("another-key").equals("anotherValue"));
   }
+
+  @Test
+  public void testExtraSnapshotMetadataWithSQL() throws IOException {
+    String tableLocation = temp.newFolder("iceberg-table").toString();
+    HadoopTables tables = new HadoopTables(CONF);
+    int threadsCount = 3;
+    ExecutorService executorService = Executors.newFixedThreadPool(threadsCount, new ThreadFactory() {
+
+      private AtomicInteger currentThreadCount = new AtomicInteger(0);
+
+      @Override
+      public Thread newThread(Runnable r) {
+        return new Thread(r, "thread-" + currentThreadCount.getAndIncrement());
+      }
+    });
+
+    Table table = tables.create(SCHEMA, PartitionSpec.unpartitioned(), Maps.newHashMap(), tableLocation);
+
+    List<SimpleRecord> expectedRecords = Lists.newArrayList(
+        new SimpleRecord(1, "a"),
+        new SimpleRecord(2, "b")
+    );
+    Dataset<Row> originalDf = spark.createDataFrame(expectedRecords, SimpleRecord.class);
+    originalDf.select("id", "data").write()
+        .format("iceberg")
+        .mode("append")
+        .option(SparkWriteOptions.SNAPSHOT_PROPERTY_PREFIX + ".extra-key", "someValue")
+        .option(SparkWriteOptions.SNAPSHOT_PROPERTY_PREFIX + ".another-key", "anotherValue")
+        .save(tableLocation);
+    spark.read().format("iceberg").load(tableLocation).createOrReplaceTempView("target");
+    Tasks
+        .range(threadsCount)
+        .stopOnFailure()
+        .throwFailureWhenFinished()
+        .executeWith(executorService)
+        .run(index -> {
+              Map<String, String> properties = Maps.newHashMap();
+              properties.put("writer-thread", String.valueOf(Thread.currentThread().getName()));
+              try {
+                CommitMetadata.withCommitProperties(properties, () -> {
+                  spark.sql("INSERT INTO target VALUES (3, 'c'), (4, 'd')");
+                  return 0;
+                });
+              } catch (Exception e) {
+                e.printStackTrace();
+              }
+            }
+        );
+    Set<String> threadNames = new HashSet<>();
+    for (Snapshot snapshot : table.snapshots()) {
+      threadNames.add(snapshot.summary().get("writer-thread"));
+    }
+    assertTrue(threadNames.contains("thread-0"));

Review Comment:
   updated



##########
spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java:
##########
@@ -404,4 +414,61 @@ public void testExtraSnapshotMetadata() throws IOException {
     Assert.assertTrue(table.currentSnapshot().summary().get("extra-key").equals("someValue"));
     Assert.assertTrue(table.currentSnapshot().summary().get("another-key").equals("anotherValue"));
   }
+
+  @Test
+  public void testExtraSnapshotMetadataWithSQL() throws IOException {
+    String tableLocation = temp.newFolder("iceberg-table").toString();
+    HadoopTables tables = new HadoopTables(CONF);
+    int threadsCount = 3;
+    ExecutorService executorService = Executors.newFixedThreadPool(threadsCount, new ThreadFactory() {
+
+      private AtomicInteger currentThreadCount = new AtomicInteger(0);
+
+      @Override
+      public Thread newThread(Runnable r) {
+        return new Thread(r, "thread-" + currentThreadCount.getAndIncrement());
+      }
+    });
+
+    Table table = tables.create(SCHEMA, PartitionSpec.unpartitioned(), Maps.newHashMap(), tableLocation);
+
+    List<SimpleRecord> expectedRecords = Lists.newArrayList(
+        new SimpleRecord(1, "a"),
+        new SimpleRecord(2, "b")
+    );
+    Dataset<Row> originalDf = spark.createDataFrame(expectedRecords, SimpleRecord.class);
+    originalDf.select("id", "data").write()
+        .format("iceberg")
+        .mode("append")
+        .option(SparkWriteOptions.SNAPSHOT_PROPERTY_PREFIX + ".extra-key", "someValue")
+        .option(SparkWriteOptions.SNAPSHOT_PROPERTY_PREFIX + ".another-key", "anotherValue")
+        .save(tableLocation);
+    spark.read().format("iceberg").load(tableLocation).createOrReplaceTempView("target");
+    Tasks

Review Comment:
   updated



##########
spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java:
##########
@@ -404,4 +414,61 @@ public void testExtraSnapshotMetadata() throws IOException {
     Assert.assertTrue(table.currentSnapshot().summary().get("extra-key").equals("someValue"));
     Assert.assertTrue(table.currentSnapshot().summary().get("another-key").equals("anotherValue"));
   }
+
+  @Test
+  public void testExtraSnapshotMetadataWithSQL() throws IOException {
+    String tableLocation = temp.newFolder("iceberg-table").toString();
+    HadoopTables tables = new HadoopTables(CONF);
+    int threadsCount = 3;
+    ExecutorService executorService = Executors.newFixedThreadPool(threadsCount, new ThreadFactory() {
+
+      private AtomicInteger currentThreadCount = new AtomicInteger(0);
+
+      @Override
+      public Thread newThread(Runnable r) {
+        return new Thread(r, "thread-" + currentThreadCount.getAndIncrement());
+      }
+    });
+
+    Table table = tables.create(SCHEMA, PartitionSpec.unpartitioned(), Maps.newHashMap(), tableLocation);
+
+    List<SimpleRecord> expectedRecords = Lists.newArrayList(
+        new SimpleRecord(1, "a"),
+        new SimpleRecord(2, "b")
+    );
+    Dataset<Row> originalDf = spark.createDataFrame(expectedRecords, SimpleRecord.class);
+    originalDf.select("id", "data").write()
+        .format("iceberg")
+        .mode("append")
+        .option(SparkWriteOptions.SNAPSHOT_PROPERTY_PREFIX + ".extra-key", "someValue")
+        .option(SparkWriteOptions.SNAPSHOT_PROPERTY_PREFIX + ".another-key", "anotherValue")
+        .save(tableLocation);
+    spark.read().format("iceberg").load(tableLocation).createOrReplaceTempView("target");
+    Tasks
+        .range(threadsCount)
+        .stopOnFailure()
+        .throwFailureWhenFinished()
+        .executeWith(executorService)
+        .run(index -> {
+              Map<String, String> properties = Maps.newHashMap();
+              properties.put("writer-thread", String.valueOf(Thread.currentThread().getName()));
+              try {
+                CommitMetadata.withCommitProperties(properties, () -> {
+                  spark.sql("INSERT INTO target VALUES (3, 'c'), (4, 'd')");
+                  return 0;
+                });
+              } catch (Exception e) {
+                e.printStackTrace();

Review Comment:
   updated



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] singhpk234 commented on a diff in pull request #4956: support adding extra commit metadata with SQL in Spark

Posted by GitBox <gi...@apache.org>.
singhpk234 commented on code in PR #4956:
URL: https://github.com/apache/iceberg/pull/4956#discussion_r890346116


##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java:
##########
@@ -192,6 +193,10 @@ private void commitOperation(SnapshotUpdate<?> operation, String description) {
       extraSnapshotMetadata.forEach(operation::set);
     }
 
+    if (!CallerWithCommitMetadata.commitProperties().isEmpty()) {
+      CallerWithCommitMetadata.commitProperties().forEach(operation::set);
+    }

Review Comment:
   [question] Should we also add this to SparkPositionDeltaWrite
   https://github.com/apache/iceberg/blob/b06a89cebd5099b40b188c4c40ea7b1a23d3427a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java#L250
   
   
   since starting 3.2 iceberg support MOR with pos deletes



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] singhpk234 commented on a diff in pull request #4956: support adding extra commit metadata with SQL in Spark

Posted by GitBox <gi...@apache.org>.
singhpk234 commented on code in PR #4956:
URL: https://github.com/apache/iceberg/pull/4956#discussion_r890342898


##########
spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java:
##########
@@ -174,6 +175,10 @@ private void commitOperation(SnapshotUpdate<?> operation, String description) {
       extraSnapshotMetadata.forEach(operation::set);
     }
 
+    if (!CallerWithCommitMetadata.commitProperties().isEmpty()) {
+      CallerWithCommitMetadata.commitProperties().forEach(operation::set);
+    }

Review Comment:
   [doubt](Probably not in scope of PR) Should we add a validation that the keys passed from here doesn't override the keys already present in the snasphot-summary ? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] CodingCat commented on a diff in pull request #4956: support adding extra commit metadata with SQL in Spark

Posted by GitBox <gi...@apache.org>.
CodingCat commented on code in PR #4956:
URL: https://github.com/apache/iceberg/pull/4956#discussion_r889486793


##########
spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java:
##########
@@ -404,4 +414,61 @@ public void testExtraSnapshotMetadata() throws IOException {
     Assert.assertTrue(table.currentSnapshot().summary().get("extra-key").equals("someValue"));
     Assert.assertTrue(table.currentSnapshot().summary().get("another-key").equals("anotherValue"));
   }
+
+  @Test
+  public void testExtraSnapshotMetadataWithSQL() throws IOException {
+    String tableLocation = temp.newFolder("iceberg-table").toString();
+    HadoopTables tables = new HadoopTables(CONF);
+    int threadsCount = 3;
+    ExecutorService executorService = Executors.newFixedThreadPool(threadsCount, new ThreadFactory() {
+
+      private AtomicInteger currentThreadCount = new AtomicInteger(0);
+
+      @Override
+      public Thread newThread(Runnable r) {
+        return new Thread(r, "thread-" + currentThreadCount.getAndIncrement());
+      }
+    });
+
+    Table table = tables.create(SCHEMA, PartitionSpec.unpartitioned(), Maps.newHashMap(), tableLocation);
+
+    List<SimpleRecord> expectedRecords = Lists.newArrayList(
+        new SimpleRecord(1, "a"),
+        new SimpleRecord(2, "b")
+    );
+    Dataset<Row> originalDf = spark.createDataFrame(expectedRecords, SimpleRecord.class);
+    originalDf.select("id", "data").write()
+        .format("iceberg")
+        .mode("append")
+        .option(SparkWriteOptions.SNAPSHOT_PROPERTY_PREFIX + ".extra-key", "someValue")
+        .option(SparkWriteOptions.SNAPSHOT_PROPERTY_PREFIX + ".another-key", "anotherValue")

Review Comment:
   updated



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a diff in pull request #4956: support adding extra commit metadata with SQL in Spark

Posted by GitBox <gi...@apache.org>.
rdblue commented on code in PR #4956:
URL: https://github.com/apache/iceberg/pull/4956#discussion_r889718080


##########
spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java:
##########
@@ -404,4 +414,61 @@ public void testExtraSnapshotMetadata() throws IOException {
     Assert.assertTrue(table.currentSnapshot().summary().get("extra-key").equals("someValue"));
     Assert.assertTrue(table.currentSnapshot().summary().get("another-key").equals("anotherValue"));
   }
+
+  @Test
+  public void testExtraSnapshotMetadataWithSQL() throws IOException {
+    String tableLocation = temp.newFolder("iceberg-table").toString();
+    HadoopTables tables = new HadoopTables(CONF);
+    int threadsCount = 3;
+    ExecutorService executorService = Executors.newFixedThreadPool(threadsCount, new ThreadFactory() {

Review Comment:
   I don't think multi-threaded testing is needed. It's enough to know that we're using a thread-local. This also is not guaranteed to run the way this test assumes that it will. There is not a guarantee that the thread pool will scale all the way up, and there's no guarantee that the tasks will each run in a separate thread. I think it's likely that those will happen, but this could still be a source of flakiness later on. Also, this doesn't necessarily test that the thread-local is working properly because there's no guarantee of concurrency across tasks.
   
   While it's probably working the way you expect, there's no guarantee that it must. So I'd prefer to keep the test simple.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a diff in pull request #4956: support adding extra commit metadata with SQL in Spark

Posted by GitBox <gi...@apache.org>.
rdblue commented on code in PR #4956:
URL: https://github.com/apache/iceberg/pull/4956#discussion_r890359233


##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java:
##########
@@ -192,6 +193,10 @@ private void commitOperation(SnapshotUpdate<?> operation, String description) {
       extraSnapshotMetadata.forEach(operation::set);
     }
 
+    if (!CallerWithCommitMetadata.commitProperties().isEmpty()) {
+      CallerWithCommitMetadata.commitProperties().forEach(operation::set);
+    }

Review Comment:
   Yes, that's a good idea.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on pull request #4956: support adding extra commit metadata with SQL in Spark

Posted by GitBox <gi...@apache.org>.
rdblue commented on PR #4956:
URL: https://github.com/apache/iceberg/pull/4956#issuecomment-1147681623

   @CodingCat, I think renaming the class back is about the only thing left to fix.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a diff in pull request #4956: support adding extra commit metadata with SQL in Spark

Posted by GitBox <gi...@apache.org>.
rdblue commented on code in PR #4956:
URL: https://github.com/apache/iceberg/pull/4956#discussion_r889116614


##########
spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java:
##########
@@ -404,4 +414,61 @@ public void testExtraSnapshotMetadata() throws IOException {
     Assert.assertTrue(table.currentSnapshot().summary().get("extra-key").equals("someValue"));
     Assert.assertTrue(table.currentSnapshot().summary().get("another-key").equals("anotherValue"));
   }
+
+  @Test
+  public void testExtraSnapshotMetadataWithSQL() throws IOException {
+    String tableLocation = temp.newFolder("iceberg-table").toString();
+    HadoopTables tables = new HadoopTables(CONF);
+    int threadsCount = 3;
+    ExecutorService executorService = Executors.newFixedThreadPool(threadsCount, new ThreadFactory() {
+
+      private AtomicInteger currentThreadCount = new AtomicInteger(0);
+
+      @Override
+      public Thread newThread(Runnable r) {
+        return new Thread(r, "thread-" + currentThreadCount.getAndIncrement());
+      }
+    });
+
+    Table table = tables.create(SCHEMA, PartitionSpec.unpartitioned(), Maps.newHashMap(), tableLocation);
+
+    List<SimpleRecord> expectedRecords = Lists.newArrayList(
+        new SimpleRecord(1, "a"),
+        new SimpleRecord(2, "b")
+    );
+    Dataset<Row> originalDf = spark.createDataFrame(expectedRecords, SimpleRecord.class);
+    originalDf.select("id", "data").write()
+        .format("iceberg")
+        .mode("append")
+        .option(SparkWriteOptions.SNAPSHOT_PROPERTY_PREFIX + ".extra-key", "someValue")
+        .option(SparkWriteOptions.SNAPSHOT_PROPERTY_PREFIX + ".another-key", "anotherValue")
+        .save(tableLocation);
+    spark.read().format("iceberg").load(tableLocation).createOrReplaceTempView("target");
+    Tasks
+        .range(threadsCount)
+        .stopOnFailure()
+        .throwFailureWhenFinished()
+        .executeWith(executorService)
+        .run(index -> {
+              Map<String, String> properties = Maps.newHashMap();
+              properties.put("writer-thread", String.valueOf(Thread.currentThread().getName()));
+              try {
+                CommitMetadata.withCommitProperties(properties, () -> {
+                  spark.sql("INSERT INTO target VALUES (3, 'c'), (4, 'd')");
+                  return 0;
+                });
+              } catch (Exception e) {
+                e.printStackTrace();

Review Comment:
   This shouldn't print. Instead, wrap it in a `RuntimeException`.
   
   Another option is to parameterize `withCommitProperties` so that you can specify what type of exception may be thrown. Then you can use `RuntimeException` here and not add the try/catch. An example of this is the `runSafely` method: https://github.com/apache/iceberg/blob/master/api/src/main/java/org/apache/iceberg/util/ExceptionUtil.java#L56



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] CodingCat commented on pull request #4956: support adding extra commit metadata with SQL in Spark

Posted by GitBox <gi...@apache.org>.
CodingCat commented on PR #4956:
URL: https://github.com/apache/iceberg/pull/4956#issuecomment-1146539628

   thanks! @rdblue and @kbendick , just updated the PR


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] kbendick commented on a diff in pull request #4956: support adding extra commit metadata with SQL in Spark

Posted by GitBox <gi...@apache.org>.
kbendick commented on code in PR #4956:
URL: https://github.com/apache/iceberg/pull/4956#discussion_r889191784


##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/CommitMetadata.java:
##########
@@ -0,0 +1,67 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.iceberg.spark;
+
+import java.util.Map;
+import java.util.concurrent.Callable;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+
+/**
+ * utility class to accept thread local commit properties
+ */
+public class CommitMetadata {

Review Comment:
   Does this class need to be `public`? Could it be made package-private?
   
   I have concerns around the usage of `ThreadLocal` for things that most cases don't need to be thread local. I don't want to give users too much room to hurt themselves because they don't consider that `CommitMetadata` is _only_ threadlocal and then their writes not working properly in the common case of writes without user-side multithreading (e.g. it gets set in one thread somewhere, but another thread is used for commit).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a diff in pull request #4956: support adding extra commit metadata with SQL in Spark

Posted by GitBox <gi...@apache.org>.
rdblue commented on code in PR #4956:
URL: https://github.com/apache/iceberg/pull/4956#discussion_r890626043


##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/CommitMetadata.java:
##########
@@ -0,0 +1,67 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.iceberg.spark;
+
+import java.util.Map;
+import java.util.concurrent.Callable;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+
+/**
+ * utility class to accept thread local commit properties
+ */
+public class CommitMetadata {

Review Comment:
   > Does this class need to be public? Could it be made package-private?
   
   Yes, this does need to be public because it is a way for Iceberg users to pass metadata.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] kbendick commented on a diff in pull request #4956: support adding extra commit metadata with SQL in Spark

Posted by GitBox <gi...@apache.org>.
kbendick commented on code in PR #4956:
URL: https://github.com/apache/iceberg/pull/4956#discussion_r889199722


##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/CommitMetadata.java:
##########
@@ -0,0 +1,67 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.iceberg.spark;
+
+import java.util.Map;
+import java.util.concurrent.Callable;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+
+/**
+ * utility class to accept thread local commit properties
+ */
+public class CommitMetadata {
+  private static final ThreadLocal<Map<String, String>> COMMIT_PROPERTIES = ThreadLocal.withInitial(ImmutableMap::of);
+
+  public static <R> R withCommitProperties(Map<String, String> properties, Callable<R> callable) throws Exception {
+    COMMIT_PROPERTIES.set(properties);
+    try {
+      return callable.call();
+    } finally {
+      COMMIT_PROPERTIES.set(ImmutableMap.of());
+    }
+  }

Review Comment:
   I guess since this requires a `Callable`, then it's not a huge deal that the class only supports ThreadLocal CommitMetadata.
   
   But to not conflict with Structured Streaming's `CommitMetadata` (especially in Java programs that can't rename imports), it might be a good idea to update the name as mentioned above.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] CodingCat commented on a diff in pull request #4956: support adding extra commit metadata with SQL in Spark

Posted by GitBox <gi...@apache.org>.
CodingCat commented on code in PR #4956:
URL: https://github.com/apache/iceberg/pull/4956#discussion_r889486746


##########
spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java:
##########
@@ -404,4 +414,61 @@ public void testExtraSnapshotMetadata() throws IOException {
     Assert.assertTrue(table.currentSnapshot().summary().get("extra-key").equals("someValue"));
     Assert.assertTrue(table.currentSnapshot().summary().get("another-key").equals("anotherValue"));
   }
+
+  @Test
+  public void testExtraSnapshotMetadataWithSQL() throws IOException {
+    String tableLocation = temp.newFolder("iceberg-table").toString();
+    HadoopTables tables = new HadoopTables(CONF);
+    int threadsCount = 3;
+    ExecutorService executorService = Executors.newFixedThreadPool(threadsCount, new ThreadFactory() {
+
+      private AtomicInteger currentThreadCount = new AtomicInteger(0);
+
+      @Override
+      public Thread newThread(Runnable r) {
+        return new Thread(r, "thread-" + currentThreadCount.getAndIncrement());
+      }
+    });
+
+    Table table = tables.create(SCHEMA, PartitionSpec.unpartitioned(), Maps.newHashMap(), tableLocation);
+
+    List<SimpleRecord> expectedRecords = Lists.newArrayList(
+        new SimpleRecord(1, "a"),
+        new SimpleRecord(2, "b")
+    );
+    Dataset<Row> originalDf = spark.createDataFrame(expectedRecords, SimpleRecord.class);
+    originalDf.select("id", "data").write()
+        .format("iceberg")
+        .mode("append")
+        .option(SparkWriteOptions.SNAPSHOT_PROPERTY_PREFIX + ".extra-key", "someValue")
+        .option(SparkWriteOptions.SNAPSHOT_PROPERTY_PREFIX + ".another-key", "anotherValue")
+        .save(tableLocation);
+    spark.read().format("iceberg").load(tableLocation).createOrReplaceTempView("target");
+    Tasks
+        .range(threadsCount)
+        .stopOnFailure()
+        .throwFailureWhenFinished()
+        .executeWith(executorService)
+        .run(index -> {
+              Map<String, String> properties = Maps.newHashMap();
+              properties.put("writer-thread", String.valueOf(Thread.currentThread().getName()));
+              try {
+                CommitMetadata.withCommitProperties(properties, () -> {
+                  spark.sql("INSERT INTO target VALUES (3, 'c'), (4, 'd')");
+                  return 0;
+                });
+              } catch (Exception e) {
+                e.printStackTrace();
+              }
+            }
+        );
+    Set<String> threadNames = new HashSet<>();
+    for (Snapshot snapshot : table.snapshots()) {
+      threadNames.add(snapshot.summary().get("writer-thread"));
+    }
+    assertTrue(threadNames.contains("thread-0"));
+    assertTrue(threadNames.contains("thread-1"));
+    assertTrue(threadNames.contains("thread-2"));
+

Review Comment:
   updated



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] kbendick commented on a diff in pull request #4956: support adding extra commit metadata with SQL in Spark

Posted by GitBox <gi...@apache.org>.
kbendick commented on code in PR #4956:
URL: https://github.com/apache/iceberg/pull/4956#discussion_r889196775


##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/CommitMetadata.java:
##########
@@ -0,0 +1,67 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.iceberg.spark;
+
+import java.util.Map;
+import java.util.concurrent.Callable;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+
+/**
+ * utility class to accept thread local commit properties
+ */
+public class CommitMetadata {

Review Comment:
   Nit: The naming of this might be somewhat confusing / cause import conflicts with spark-sql's `CommitMetadata` case class for streaming commit metadata.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a diff in pull request #4956: support adding extra commit metadata with SQL in Spark

Posted by GitBox <gi...@apache.org>.
rdblue commented on code in PR #4956:
URL: https://github.com/apache/iceberg/pull/4956#discussion_r889106852


##########
spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java:
##########
@@ -404,4 +414,61 @@ public void testExtraSnapshotMetadata() throws IOException {
     Assert.assertTrue(table.currentSnapshot().summary().get("extra-key").equals("someValue"));
     Assert.assertTrue(table.currentSnapshot().summary().get("another-key").equals("anotherValue"));
   }
+
+  @Test
+  public void testExtraSnapshotMetadataWithSQL() throws IOException {
+    String tableLocation = temp.newFolder("iceberg-table").toString();
+    HadoopTables tables = new HadoopTables(CONF);
+    int threadsCount = 3;
+    ExecutorService executorService = Executors.newFixedThreadPool(threadsCount, new ThreadFactory() {
+
+      private AtomicInteger currentThreadCount = new AtomicInteger(0);
+
+      @Override
+      public Thread newThread(Runnable r) {
+        return new Thread(r, "thread-" + currentThreadCount.getAndIncrement());
+      }
+    });
+
+    Table table = tables.create(SCHEMA, PartitionSpec.unpartitioned(), Maps.newHashMap(), tableLocation);
+
+    List<SimpleRecord> expectedRecords = Lists.newArrayList(
+        new SimpleRecord(1, "a"),
+        new SimpleRecord(2, "b")
+    );
+    Dataset<Row> originalDf = spark.createDataFrame(expectedRecords, SimpleRecord.class);
+    originalDf.select("id", "data").write()
+        .format("iceberg")
+        .mode("append")
+        .option(SparkWriteOptions.SNAPSHOT_PROPERTY_PREFIX + ".extra-key", "someValue")
+        .option(SparkWriteOptions.SNAPSHOT_PROPERTY_PREFIX + ".another-key", "anotherValue")
+        .save(tableLocation);
+    spark.read().format("iceberg").load(tableLocation).createOrReplaceTempView("target");
+    Tasks

Review Comment:
   I'd add empty lines before and after this to make the method more readable.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] singhpk234 commented on a diff in pull request #4956: support adding extra commit metadata with SQL in Spark

Posted by GitBox <gi...@apache.org>.
singhpk234 commented on code in PR #4956:
URL: https://github.com/apache/iceberg/pull/4956#discussion_r890338142


##########
spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/CallerWithCommitMetadata.java:
##########
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark;
+
+import java.util.Map;
+import java.util.concurrent.Callable;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.util.ExceptionUtil;
+
+/**
+ * utility class to accept thread local commit properties
+ */
+public class CallerWithCommitMetadata {
+
+  private CallerWithCommitMetadata() {
+
+  }
+
+  private static final ThreadLocal<Map<String, String>> COMMIT_PROPERTIES = ThreadLocal.withInitial(ImmutableMap::of);
+
+  /**
+   * running the code wrapped as a caller, and any snapshot committed within the callable object will be attached with
+   * the metadata defined in properties
+   * @param properties extra commit metadata to attach to the snapshot committed within callable
+   * @param callable the code to be executed
+   * @param exClass the expected type of exception which would be thrown from callable
+   */
+  public static <R, E extends Exception> R withCommitProperties(
+      Map<String, String> properties, Callable<R> callable, Class<E> exClass) throws E {
+    COMMIT_PROPERTIES.set(properties);
+    try {
+      return callable.call();
+    } catch (Throwable e) {
+      ExceptionUtil.castAndThrow(e, exClass);
+      return null;

Review Comment:
   [minor] is this required as we throw in the line above ? 



##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java:
##########
@@ -192,6 +193,10 @@ private void commitOperation(SnapshotUpdate<?> operation, String description) {
       extraSnapshotMetadata.forEach(operation::set);
     }
 
+    if (!CallerWithCommitMetadata.commitProperties().isEmpty()) {
+      CallerWithCommitMetadata.commitProperties().forEach(operation::set);
+    }

Review Comment:
   [question] Should we also add this to SparkPositionWrite
   https://github.com/apache/iceberg/blob/b06a89cebd5099b40b188c4c40ea7b1a23d3427a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java#L250
   
   
   since starting 3.2 iceberg support MOR with pos deletes



##########
spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java:
##########
@@ -174,6 +175,10 @@ private void commitOperation(SnapshotUpdate<?> operation, String description) {
       extraSnapshotMetadata.forEach(operation::set);
     }
 
+    if (!CallerWithCommitMetadata.commitProperties().isEmpty()) {
+      CallerWithCommitMetadata.commitProperties().forEach(operation::set);
+    }

Review Comment:
   [doubt](Probably not in scope of PR) I see there is no validation happening in the keys passed from here doesn't override the keys already present in the summary, should we also add one ? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] singhpk234 commented on a diff in pull request #4956: support adding extra commit metadata with SQL in Spark

Posted by GitBox <gi...@apache.org>.
singhpk234 commented on code in PR #4956:
URL: https://github.com/apache/iceberg/pull/4956#discussion_r890342898


##########
spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java:
##########
@@ -174,6 +175,10 @@ private void commitOperation(SnapshotUpdate<?> operation, String description) {
       extraSnapshotMetadata.forEach(operation::set);
     }
 
+    if (!CallerWithCommitMetadata.commitProperties().isEmpty()) {
+      CallerWithCommitMetadata.commitProperties().forEach(operation::set);
+    }

Review Comment:
   [doubt](Probably not in scope of PR) Should we add a validation that the keys passed from here doesn't override the keys already present in the snasphot-summary ? Or this functionality is intended to do



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] CodingCat commented on a diff in pull request #4956: support adding extra commit metadata with SQL in Spark

Posted by GitBox <gi...@apache.org>.
CodingCat commented on code in PR #4956:
URL: https://github.com/apache/iceberg/pull/4956#discussion_r889486899


##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/CommitMetadata.java:
##########
@@ -0,0 +1,67 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.iceberg.spark;
+
+import java.util.Map;
+import java.util.concurrent.Callable;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+
+/**
+ * utility class to accept thread local commit properties
+ */
+public class CommitMetadata {

Review Comment:
   sure, changed to CallerWithCommitMetadata....but...eh...not sure if it is better or worse....



##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/CommitMetadata.java:
##########
@@ -0,0 +1,67 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.iceberg.spark;
+
+import java.util.Map;
+import java.util.concurrent.Callable;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+
+/**
+ * utility class to accept thread local commit properties
+ */
+public class CommitMetadata {

Review Comment:
   updated



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] CodingCat commented on a diff in pull request #4956: support adding extra commit metadata with SQL in Spark

Posted by GitBox <gi...@apache.org>.
CodingCat commented on code in PR #4956:
URL: https://github.com/apache/iceberg/pull/4956#discussion_r889486223


##########
spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java:
##########
@@ -404,4 +414,61 @@ public void testExtraSnapshotMetadata() throws IOException {
     Assert.assertTrue(table.currentSnapshot().summary().get("extra-key").equals("someValue"));
     Assert.assertTrue(table.currentSnapshot().summary().get("another-key").equals("anotherValue"));
   }
+
+  @Test
+  public void testExtraSnapshotMetadataWithSQL() throws IOException {
+    String tableLocation = temp.newFolder("iceberg-table").toString();
+    HadoopTables tables = new HadoopTables(CONF);
+    int threadsCount = 3;
+    ExecutorService executorService = Executors.newFixedThreadPool(threadsCount, new ThreadFactory() {

Review Comment:
   updated to ThreadPool, I think multi-threading testing is still necessary? as we need to have something guarding that the commit metadata change is thread safe no matter we use ThreadLocal as now or later we change to something else for any reason 
   



##########
spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java:
##########
@@ -404,4 +414,61 @@ public void testExtraSnapshotMetadata() throws IOException {
     Assert.assertTrue(table.currentSnapshot().summary().get("extra-key").equals("someValue"));
     Assert.assertTrue(table.currentSnapshot().summary().get("another-key").equals("anotherValue"));
   }
+
+  @Test
+  public void testExtraSnapshotMetadataWithSQL() throws IOException {
+    String tableLocation = temp.newFolder("iceberg-table").toString();
+    HadoopTables tables = new HadoopTables(CONF);
+    int threadsCount = 3;
+    ExecutorService executorService = Executors.newFixedThreadPool(threadsCount, new ThreadFactory() {
+
+      private AtomicInteger currentThreadCount = new AtomicInteger(0);
+
+      @Override
+      public Thread newThread(Runnable r) {
+        return new Thread(r, "thread-" + currentThreadCount.getAndIncrement());
+      }
+    });
+
+    Table table = tables.create(SCHEMA, PartitionSpec.unpartitioned(), Maps.newHashMap(), tableLocation);
+
+    List<SimpleRecord> expectedRecords = Lists.newArrayList(
+        new SimpleRecord(1, "a"),
+        new SimpleRecord(2, "b")
+    );
+    Dataset<Row> originalDf = spark.createDataFrame(expectedRecords, SimpleRecord.class);
+    originalDf.select("id", "data").write()
+        .format("iceberg")
+        .mode("append")
+        .option(SparkWriteOptions.SNAPSHOT_PROPERTY_PREFIX + ".extra-key", "someValue")
+        .option(SparkWriteOptions.SNAPSHOT_PROPERTY_PREFIX + ".another-key", "anotherValue")
+        .save(tableLocation);
+    spark.read().format("iceberg").load(tableLocation).createOrReplaceTempView("target");
+    Tasks
+        .range(threadsCount)
+        .stopOnFailure()
+        .throwFailureWhenFinished()
+        .executeWith(executorService)
+        .run(index -> {
+              Map<String, String> properties = Maps.newHashMap();
+              properties.put("writer-thread", String.valueOf(Thread.currentThread().getName()));
+              try {
+                CommitMetadata.withCommitProperties(properties, () -> {
+                  spark.sql("INSERT INTO target VALUES (3, 'c'), (4, 'd')");
+                  return 0;
+                });
+              } catch (Exception e) {
+                e.printStackTrace();
+              }
+            }
+        );
+    Set<String> threadNames = new HashSet<>();

Review Comment:
   updated
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] CodingCat commented on pull request #4956: support adding extra commit metadata with SQL in Spark

Posted by GitBox <gi...@apache.org>.
CodingCat commented on PR #4956:
URL: https://github.com/apache/iceberg/pull/4956#issuecomment-1146900956

   just to confirm, seems we don't support SQL based table insert/merge in Spark 2.4 at all right? (so this functionality is not relevant there)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] kbendick commented on a diff in pull request #4956: support adding extra commit metadata with SQL in Spark

Posted by GitBox <gi...@apache.org>.
kbendick commented on code in PR #4956:
URL: https://github.com/apache/iceberg/pull/4956#discussion_r889194071


##########
spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java:
##########
@@ -404,4 +414,61 @@ public void testExtraSnapshotMetadata() throws IOException {
     Assert.assertTrue(table.currentSnapshot().summary().get("extra-key").equals("someValue"));
     Assert.assertTrue(table.currentSnapshot().summary().get("another-key").equals("anotherValue"));
   }
+
+  @Test
+  public void testExtraSnapshotMetadataWithSQL() throws IOException {
+    String tableLocation = temp.newFolder("iceberg-table").toString();
+    HadoopTables tables = new HadoopTables(CONF);
+    int threadsCount = 3;
+    ExecutorService executorService = Executors.newFixedThreadPool(threadsCount, new ThreadFactory() {

Review Comment:
   I would also prefer a test that uses a single write in the current thread without any additional threading business. I worry that `CommitMetadata` doesn't _seem_ thread local to users.
   
   And then if a multi-threaded test is needed, using the helpers from `ThreadPools` as suggested.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a diff in pull request #4956: support adding extra commit metadata with SQL in Spark

Posted by GitBox <gi...@apache.org>.
rdblue commented on code in PR #4956:
URL: https://github.com/apache/iceberg/pull/4956#discussion_r889116820


##########
spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java:
##########
@@ -404,4 +414,61 @@ public void testExtraSnapshotMetadata() throws IOException {
     Assert.assertTrue(table.currentSnapshot().summary().get("extra-key").equals("someValue"));
     Assert.assertTrue(table.currentSnapshot().summary().get("another-key").equals("anotherValue"));
   }
+
+  @Test
+  public void testExtraSnapshotMetadataWithSQL() throws IOException {
+    String tableLocation = temp.newFolder("iceberg-table").toString();
+    HadoopTables tables = new HadoopTables(CONF);
+    int threadsCount = 3;
+    ExecutorService executorService = Executors.newFixedThreadPool(threadsCount, new ThreadFactory() {
+
+      private AtomicInteger currentThreadCount = new AtomicInteger(0);
+
+      @Override
+      public Thread newThread(Runnable r) {
+        return new Thread(r, "thread-" + currentThreadCount.getAndIncrement());
+      }
+    });
+
+    Table table = tables.create(SCHEMA, PartitionSpec.unpartitioned(), Maps.newHashMap(), tableLocation);
+
+    List<SimpleRecord> expectedRecords = Lists.newArrayList(
+        new SimpleRecord(1, "a"),
+        new SimpleRecord(2, "b")
+    );
+    Dataset<Row> originalDf = spark.createDataFrame(expectedRecords, SimpleRecord.class);
+    originalDf.select("id", "data").write()
+        .format("iceberg")
+        .mode("append")
+        .option(SparkWriteOptions.SNAPSHOT_PROPERTY_PREFIX + ".extra-key", "someValue")
+        .option(SparkWriteOptions.SNAPSHOT_PROPERTY_PREFIX + ".another-key", "anotherValue")

Review Comment:
   Are these checked? If we don't need them, I think it would be better to omit them.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] CodingCat commented on a diff in pull request #4956: support adding extra commit metadata with SQL in Spark

Posted by GitBox <gi...@apache.org>.
CodingCat commented on code in PR #4956:
URL: https://github.com/apache/iceberg/pull/4956#discussion_r889743158


##########
spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java:
##########
@@ -404,4 +414,61 @@ public void testExtraSnapshotMetadata() throws IOException {
     Assert.assertTrue(table.currentSnapshot().summary().get("extra-key").equals("someValue"));
     Assert.assertTrue(table.currentSnapshot().summary().get("another-key").equals("anotherValue"));
   }
+
+  @Test
+  public void testExtraSnapshotMetadataWithSQL() throws IOException {
+    String tableLocation = temp.newFolder("iceberg-table").toString();
+    HadoopTables tables = new HadoopTables(CONF);
+    int threadsCount = 3;
+    ExecutorService executorService = Executors.newFixedThreadPool(threadsCount, new ThreadFactory() {

Review Comment:
   updated



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a diff in pull request #4956: support adding extra commit metadata with SQL in Spark

Posted by GitBox <gi...@apache.org>.
rdblue commented on code in PR #4956:
URL: https://github.com/apache/iceberg/pull/4956#discussion_r890358534


##########
spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java:
##########
@@ -174,6 +175,10 @@ private void commitOperation(SnapshotUpdate<?> operation, String description) {
       extraSnapshotMetadata.forEach(operation::set);
     }
 
+    if (!CallerWithCommitMetadata.commitProperties().isEmpty()) {
+      CallerWithCommitMetadata.commitProperties().forEach(operation::set);
+    }

Review Comment:
   I don't think that we need to worry about this. It is unlikely to conflict and if it does conflict, it's up to the caller to decide what to do.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a diff in pull request #4956: support adding extra commit metadata with SQL in Spark

Posted by GitBox <gi...@apache.org>.
rdblue commented on code in PR #4956:
URL: https://github.com/apache/iceberg/pull/4956#discussion_r890359764


##########
spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/CallerWithCommitMetadata.java:
##########
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark;
+
+import java.util.Map;
+import java.util.concurrent.Callable;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.util.ExceptionUtil;
+
+/**
+ * utility class to accept thread local commit properties
+ */
+public class CallerWithCommitMetadata {
+
+  private CallerWithCommitMetadata() {
+
+  }
+
+  private static final ThreadLocal<Map<String, String>> COMMIT_PROPERTIES = ThreadLocal.withInitial(ImmutableMap::of);
+
+  /**
+   * running the code wrapped as a caller, and any snapshot committed within the callable object will be attached with
+   * the metadata defined in properties
+   * @param properties extra commit metadata to attach to the snapshot committed within callable
+   * @param callable the code to be executed
+   * @param exClass the expected type of exception which would be thrown from callable
+   */
+  public static <R, E extends Exception> R withCommitProperties(
+      Map<String, String> properties, Callable<R> callable, Class<E> exClass) throws E {
+    COMMIT_PROPERTIES.set(properties);
+    try {
+      return callable.call();
+    } catch (Throwable e) {
+      ExceptionUtil.castAndThrow(e, exClass);
+      return null;

Review Comment:
   I don't think the compiler sees that `castAndThrow` will always throw, so it needs this to know what to do.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue merged pull request #4956: support adding extra commit metadata with SQL in Spark

Posted by GitBox <gi...@apache.org>.
rdblue merged PR #4956:
URL: https://github.com/apache/iceberg/pull/4956


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] CodingCat commented on pull request #4956: support adding extra commit metadata with SQL in Spark

Posted by GitBox <gi...@apache.org>.
CodingCat commented on PR #4956:
URL: https://github.com/apache/iceberg/pull/4956#issuecomment-1146103898

   Hi, @kbendick @rdblue  I just made this PR as a followup of https://github.com/apache/iceberg/pull/4795, please help to review and thanks in advance! once we agree on the approach here, I will add changes to other versions of Spark 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a diff in pull request #4956: support adding extra commit metadata with SQL in Spark

Posted by GitBox <gi...@apache.org>.
rdblue commented on code in PR #4956:
URL: https://github.com/apache/iceberg/pull/4956#discussion_r889091459


##########
spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java:
##########
@@ -404,4 +414,61 @@ public void testExtraSnapshotMetadata() throws IOException {
     Assert.assertTrue(table.currentSnapshot().summary().get("extra-key").equals("someValue"));
     Assert.assertTrue(table.currentSnapshot().summary().get("another-key").equals("anotherValue"));
   }
+
+  @Test
+  public void testExtraSnapshotMetadataWithSQL() throws IOException {
+    String tableLocation = temp.newFolder("iceberg-table").toString();
+    HadoopTables tables = new HadoopTables(CONF);
+    int threadsCount = 3;
+    ExecutorService executorService = Executors.newFixedThreadPool(threadsCount, new ThreadFactory() {

Review Comment:
   Can you use the helper in `ThreadPools` and also wrap it in a try/finally to close the threadpool?
   
   I'm also not entirely sure this requires a threadpool to test. I think it would be fine to test a single write in the current thread.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] kbendick commented on a diff in pull request #4956: support adding extra commit metadata with SQL in Spark

Posted by GitBox <gi...@apache.org>.
kbendick commented on code in PR #4956:
URL: https://github.com/apache/iceberg/pull/4956#discussion_r889191784


##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/CommitMetadata.java:
##########
@@ -0,0 +1,67 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.iceberg.spark;
+
+import java.util.Map;
+import java.util.concurrent.Callable;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+
+/**
+ * utility class to accept thread local commit properties
+ */
+public class CommitMetadata {

Review Comment:
   Does this class need to be `public`? Could it be made package-private?
   
   I have concerns around the usage of `ThreadLocal` for things that most cases don't need to be thread local. I don't want to give users too much room to hurt themselves because they don't consider that `CommitMetadata` is _only_ threadlocal and then their writes not working properly in the common case of writes without user-side multithreading (e.g. it gets set in one thread somewhere, but another thread is used for commit).
   
   EDIT - Since this takes a `Callable`, it's less of a concern. I would still name it in a way that's a bit more reflective of the thread local nature (especially if we wanted a `CommitMetadata` class one day that doens't require a callable and is persistent). That and I always prefer things be package-private if possible.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on pull request #4956: support adding extra commit metadata with SQL in Spark

Posted by GitBox <gi...@apache.org>.
rdblue commented on PR #4956:
URL: https://github.com/apache/iceberg/pull/4956#issuecomment-1148008561

   Thanks, @CodingCat!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] kbendick commented on pull request #4956: support adding extra commit metadata with SQL in Spark

Posted by GitBox <gi...@apache.org>.
kbendick commented on PR #4956:
URL: https://github.com/apache/iceberg/pull/4956#issuecomment-1147682895

   > > What was wrong with the `CommitMetadata` name? I think we should have a simpler name than `CallerWithCommitMetadata`. That's a bit too confusing. I think the original name was good.
   > 
   > [#4956 (comment)](https://github.com/apache/iceberg/pull/4956#discussion_r889199722) @kbendick mentioned some potential conflict with SS's CommitMetadata here, I don't have strong opinion on this
   
   I’m good with the original name too (and prefer it to the new one).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] singhpk234 commented on a diff in pull request #4956: support adding extra commit metadata with SQL in Spark

Posted by GitBox <gi...@apache.org>.
singhpk234 commented on code in PR #4956:
URL: https://github.com/apache/iceberg/pull/4956#discussion_r890342898


##########
spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java:
##########
@@ -174,6 +175,10 @@ private void commitOperation(SnapshotUpdate<?> operation, String description) {
       extraSnapshotMetadata.forEach(operation::set);
     }
 
+    if (!CallerWithCommitMetadata.commitProperties().isEmpty()) {
+      CallerWithCommitMetadata.commitProperties().forEach(operation::set);
+    }

Review Comment:
   [doubt](Probably not in scope of PR) Should we add a validation that the keys passed from here  as well as extraSnapshotMetadata doesn't override the keys already present in the snasphot-summary ? Or this functionality is intended to do



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org