You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by "CodingCat (via GitHub)" <gi...@apache.org> on 2023/05/18 21:27:25 UTC

[GitHub] [iceberg] CodingCat opened a new pull request, #7649: enable adding commit msg with metadata delete and add docs about adding commit msg with sql

CodingCat opened a new pull request, #7649:
URL: https://github.com/apache/iceberg/pull/7649

   (no comment)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] CodingCat commented on a diff in pull request #7649: enable adding commit msg with metadata delete and add docs about adding commit msg with sql

Posted by "CodingCat (via GitHub)" <gi...@apache.org>.
CodingCat commented on code in PR #7649:
URL: https://github.com/apache/iceberg/pull/7649#discussion_r1204944203


##########
spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java:
##########
@@ -445,4 +448,40 @@ public void testExtraSnapshotMetadataWithSQL() throws InterruptedException, IOEx
     Assert.assertTrue(threadNames.contains(null));
     Assert.assertTrue(threadNames.contains("test-extra-commit-message-writer-thread"));
   }
+
+  @Test
+  public void testExtraSnapshotMetadataWithDelete()
+      throws InterruptedException, IOException, NoSuchTableException {
+    spark.sessionState().conf().setConfString("spark.sql.shuffle.partitions", "1");
+    sql("CREATE TABLE %s (id INT, data STRING) USING iceberg", tableName);
+    List<SimpleRecord> expectedRecords =
+        Lists.newArrayList(
+            new SimpleRecord(1, "a"), new SimpleRecord(2, "b"), new SimpleRecord(3, "c"));
+    Dataset<Row> originalDf = spark.createDataFrame(expectedRecords, SimpleRecord.class);
+    originalDf.repartition(5, new Column("data")).select("id", "data").writeTo(tableName).append();
+    Thread writerThread =
+        new Thread(
+            () -> {
+              Map<String, String> properties = Maps.newHashMap();
+              properties.put("writer-thread", String.valueOf(Thread.currentThread().getName()));
+              CommitMetadata.withCommitProperties(
+                  properties,
+                  () -> {
+                    spark.sql("DELETE FROM " + tableName + " where id = 1");
+                    return 0;
+                  },
+                  RuntimeException.class);
+            });
+    writerThread.setName("test-extra-commit-message-delete-thread");
+    writerThread.start();
+    writerThread.join();

Review Comment:
   updated



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] flyrain merged pull request #7649: Enable extra commit properties with metadata delete

Posted by "flyrain (via GitHub)" <gi...@apache.org>.
flyrain merged PR #7649:
URL: https://github.com/apache/iceberg/pull/7649


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] CodingCat commented on pull request #7649: enable adding commit msg with metadata delete and add docs about adding commit msg with sql

Posted by "CodingCat (via GitHub)" <gi...@apache.org>.
CodingCat commented on PR #7649:
URL: https://github.com/apache/iceberg/pull/7649#issuecomment-1553672655

   cc: @rdblue @RussellSpitzer @flyrain would you mind giving it a review? thank you!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] CodingCat commented on pull request #7649: enable adding commit msg with metadata delete and add docs about adding commit msg with sql

Posted by "CodingCat (via GitHub)" <gi...@apache.org>.
CodingCat commented on PR #7649:
URL: https://github.com/apache/iceberg/pull/7649#issuecomment-1562207304

   > Thanks @CodingCat for the PR. Looks good to me overall. Left minor comments. I don't have much context with `CommitMetadata`. My understanding is that every time we generate a new snapshot. We will need to inject it if it is there. Is it possible to have the functionality of setting it in a base class of all snapshot producer?
   
   thanks for the review @flyrain !
   
   IIUC, you mean we move the code consuming CommitMetadata to a base class so that we will not fall into the situation like this PR tries to address?
   
    I thought about it when started working on this PR , it may involve a bit refactoring for iceberg-spark ...
   
   
   in the current implementations,  `commitOperation` in [SparkWrite](https://github.com/apache/iceberg/blob/82880be39654bb94aaf370338bdd61f706caa6da/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java#L192)  is different with that in [SparkPositionDeltaWrite](https://github.com/apache/iceberg/blob/82880be39654bb94aaf370338bdd61f706caa6da/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java#L259) are pretty much the same ,  we can potentially move them to a base trait, my only concern is 
   
   (1) do we want to do in this PR?
   (2) in future, can we guarantee to use the same commit implementation for all snapshot producers (at least in Spark)?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] CodingCat commented on a diff in pull request #7649: enable adding commit msg with metadata delete and add docs about adding commit msg with sql

Posted by "CodingCat (via GitHub)" <gi...@apache.org>.
CodingCat commented on code in PR #7649:
URL: https://github.com/apache/iceberg/pull/7649#discussion_r1204943467


##########
spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java:
##########
@@ -448,4 +451,52 @@ public void testExtraSnapshotMetadataWithSQL() throws InterruptedException, IOEx
     Assert.assertTrue(threadNames.contains(null));
     Assert.assertTrue(threadNames.contains("test-extra-commit-message-writer-thread"));
   }
+
+  @Test
+  public void testExtraSnapshotMetadataWithDelete()
+      throws InterruptedException, IOException, NoSuchTableException {
+    spark.sessionState().conf().setConfString("spark.sql.shuffle.partitions", "1");
+    sql("CREATE TABLE %s (id INT, data STRING) USING iceberg", tableName);
+    List<SimpleRecord> expectedRecords =
+        Lists.newArrayList(
+            new SimpleRecord(1, "a"), new SimpleRecord(2, "b"), new SimpleRecord(3, "c"));
+    Dataset<Row> originalDf = spark.createDataFrame(expectedRecords, SimpleRecord.class);
+    originalDf.repartition(5, new Column("data")).select("id", "data").writeTo(tableName).append();
+    spark.sql("SELECT * from " + tableName + ".files").show();
+    System.out.println(
+        spark
+            .sql("EXPLAIN DELETE FROM " + tableName + " where id = 1")
+            .collectAsList()
+            .get(0)
+            .get(0));
+    System.out.println("finished inserting");

Review Comment:
   removed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] CodingCat commented on a diff in pull request #7649: enable adding commit msg with metadata delete and add docs about adding commit msg with sql

Posted by "CodingCat (via GitHub)" <gi...@apache.org>.
CodingCat commented on code in PR #7649:
URL: https://github.com/apache/iceberg/pull/7649#discussion_r1204947405


##########
spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java:
##########
@@ -445,4 +448,40 @@ public void testExtraSnapshotMetadataWithSQL() throws InterruptedException, IOEx
     Assert.assertTrue(threadNames.contains(null));
     Assert.assertTrue(threadNames.contains("test-extra-commit-message-writer-thread"));
   }
+
+  @Test
+  public void testExtraSnapshotMetadataWithDelete()
+      throws InterruptedException, IOException, NoSuchTableException {
+    spark.sessionState().conf().setConfString("spark.sql.shuffle.partitions", "1");
+    sql("CREATE TABLE %s (id INT, data STRING) USING iceberg", tableName);
+    List<SimpleRecord> expectedRecords =
+        Lists.newArrayList(
+            new SimpleRecord(1, "a"), new SimpleRecord(2, "b"), new SimpleRecord(3, "c"));
+    Dataset<Row> originalDf = spark.createDataFrame(expectedRecords, SimpleRecord.class);
+    originalDf.repartition(5, new Column("data")).select("id", "data").writeTo(tableName).append();
+    Thread writerThread =
+        new Thread(
+            () -> {
+              Map<String, String> properties = Maps.newHashMap();
+              properties.put("writer-thread", String.valueOf(Thread.currentThread().getName()));
+              CommitMetadata.withCommitProperties(
+                  properties,
+                  () -> {
+                    spark.sql("DELETE FROM " + tableName + " where id = 1");
+                    return 0;
+                  },
+                  RuntimeException.class);
+            });
+    writerThread.setName("test-extra-commit-message-delete-thread");
+    writerThread.start();
+    writerThread.join();
+    Set<String> threadNames = Sets.newHashSet();
+    Table table = validationCatalog.loadTable(tableIdent);
+    for (Snapshot snapshot : table.snapshots()) {
+      threadNames.add(snapshot.summary().get("writer-thread"));
+    }
+    Assert.assertEquals(2, threadNames.size());
+    Assert.assertTrue(threadNames.contains(null));
+    Assert.assertTrue(threadNames.contains("test-extra-commit-message-delete-thread"));

Review Comment:
   updated



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] flyrain commented on a diff in pull request #7649: enable adding commit msg with metadata delete and add docs about adding commit msg with sql

Posted by "flyrain (via GitHub)" <gi...@apache.org>.
flyrain commented on code in PR #7649:
URL: https://github.com/apache/iceberg/pull/7649#discussion_r1204884505


##########
spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java:
##########
@@ -445,4 +448,40 @@ public void testExtraSnapshotMetadataWithSQL() throws InterruptedException, IOEx
     Assert.assertTrue(threadNames.contains(null));
     Assert.assertTrue(threadNames.contains("test-extra-commit-message-writer-thread"));
   }
+
+  @Test
+  public void testExtraSnapshotMetadataWithDelete()
+      throws InterruptedException, IOException, NoSuchTableException {

Review Comment:
   Nit: No IOException threw here.



##########
spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java:
##########
@@ -448,4 +451,52 @@ public void testExtraSnapshotMetadataWithSQL() throws InterruptedException, IOEx
     Assert.assertTrue(threadNames.contains(null));
     Assert.assertTrue(threadNames.contains("test-extra-commit-message-writer-thread"));
   }
+
+  @Test
+  public void testExtraSnapshotMetadataWithDelete()
+      throws InterruptedException, IOException, NoSuchTableException {
+    spark.sessionState().conf().setConfString("spark.sql.shuffle.partitions", "1");
+    sql("CREATE TABLE %s (id INT, data STRING) USING iceberg", tableName);
+    List<SimpleRecord> expectedRecords =
+        Lists.newArrayList(
+            new SimpleRecord(1, "a"), new SimpleRecord(2, "b"), new SimpleRecord(3, "c"));
+    Dataset<Row> originalDf = spark.createDataFrame(expectedRecords, SimpleRecord.class);
+    originalDf.repartition(5, new Column("data")).select("id", "data").writeTo(tableName).append();
+    spark.sql("SELECT * from " + tableName + ".files").show();

Review Comment:
   Do we need this line?



##########
spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java:
##########
@@ -448,4 +451,52 @@ public void testExtraSnapshotMetadataWithSQL() throws InterruptedException, IOEx
     Assert.assertTrue(threadNames.contains(null));
     Assert.assertTrue(threadNames.contains("test-extra-commit-message-writer-thread"));
   }
+
+  @Test
+  public void testExtraSnapshotMetadataWithDelete()
+      throws InterruptedException, IOException, NoSuchTableException {
+    spark.sessionState().conf().setConfString("spark.sql.shuffle.partitions", "1");
+    sql("CREATE TABLE %s (id INT, data STRING) USING iceberg", tableName);
+    List<SimpleRecord> expectedRecords =
+        Lists.newArrayList(
+            new SimpleRecord(1, "a"), new SimpleRecord(2, "b"), new SimpleRecord(3, "c"));
+    Dataset<Row> originalDf = spark.createDataFrame(expectedRecords, SimpleRecord.class);
+    originalDf.repartition(5, new Column("data")).select("id", "data").writeTo(tableName).append();
+    spark.sql("SELECT * from " + tableName + ".files").show();
+    System.out.println(
+        spark
+            .sql("EXPLAIN DELETE FROM " + tableName + " where id = 1")
+            .collectAsList()
+            .get(0)
+            .get(0));
+    System.out.println("finished inserting");
+    Thread writerThread =
+        new Thread(
+            () -> {
+              Map<String, String> properties = Maps.newHashMap();
+              properties.put("writer-thread", String.valueOf(Thread.currentThread().getName()));
+              CommitMetadata.withCommitProperties(
+                  properties,
+                  () -> {
+                    spark.sql("DELETE FROM " + tableName + " where id = 1");
+                    return 0;
+                  },
+                  RuntimeException.class);
+            });
+    writerThread.setName("test-extra-commit-message-delete-thread");
+    writerThread.start();
+    writerThread.join();
+    Set<String> threadNames = Sets.newHashSet();
+    spark.sql("SELECT * from " + tableName).show();
+    Table table = validationCatalog.loadTable(tableIdent);
+    for (Snapshot snapshot : table.snapshots()) {
+      threadNames.add(snapshot.summary().get("writer-thread"));
+    }
+    for (String t : threadNames) {
+      System.out.println(t);
+    }

Review Comment:
   Is it necessary?



##########
spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java:
##########
@@ -448,4 +451,52 @@ public void testExtraSnapshotMetadataWithSQL() throws InterruptedException, IOEx
     Assert.assertTrue(threadNames.contains(null));
     Assert.assertTrue(threadNames.contains("test-extra-commit-message-writer-thread"));
   }
+
+  @Test
+  public void testExtraSnapshotMetadataWithDelete()
+      throws InterruptedException, IOException, NoSuchTableException {
+    spark.sessionState().conf().setConfString("spark.sql.shuffle.partitions", "1");
+    sql("CREATE TABLE %s (id INT, data STRING) USING iceberg", tableName);
+    List<SimpleRecord> expectedRecords =
+        Lists.newArrayList(
+            new SimpleRecord(1, "a"), new SimpleRecord(2, "b"), new SimpleRecord(3, "c"));
+    Dataset<Row> originalDf = spark.createDataFrame(expectedRecords, SimpleRecord.class);
+    originalDf.repartition(5, new Column("data")).select("id", "data").writeTo(tableName).append();
+    spark.sql("SELECT * from " + tableName + ".files").show();
+    System.out.println(
+        spark
+            .sql("EXPLAIN DELETE FROM " + tableName + " where id = 1")
+            .collectAsList()
+            .get(0)
+            .get(0));
+    System.out.println("finished inserting");

Review Comment:
   Can we remove `System.out.println`?



##########
spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDataSourceV2Strategy.scala:
##########
@@ -122,6 +122,8 @@ case class ExtendedDataSourceV2Strategy(spark: SparkSession) extends Strategy wi
         }
         filter
       }.toArray
+      // scalastyle:off
+      println("DeleteFromIcebergTable")

Review Comment:
   Is this change relevant?



##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java:
##########
@@ -157,6 +157,7 @@ public DeltaWriterFactory createBatchWriterFactory(PhysicalWriteInfo info) {
 
     @Override
     public void commit(WriterCommitMessage[] messages) {
+

Review Comment:
   Nit: this seems irrelevant. Can we remove it?



##########
docs/spark-configuration.md:
##########
@@ -194,3 +194,20 @@ df.write
 | check-ordering       | true        | Checks if input schema and table schema are same  |
 | isolation-level | null | Desired isolation level for Dataframe overwrite operations.  `null` => no checks (for idempotent writes), `serializable` => check for concurrent inserts or deletes in destination partitions, `snapshot` => checks for concurrent deletes in destination partitions. |
 | validate-from-snapshot-id | null | If isolation level is set, id of base snapshot from which to check concurrent write conflicts into a table. Should be the snapshot before any reads from the table. Can be obtained via [Table API](../../api#table-metadata) or [Snapshots table](../spark-queries#snapshots). If null, the table's oldest known snapshot is used. |
+
+
+specifically, if you run SQL statements, you could use `org.apache.iceberg.spark.CommitMetadata` to add entries with custom-keys and corresponding values in the snapshot summary
+
+```java
+
+import org.apache.iceberg.spark.CommitMetadata;
+Map<String, String> properties = Maps.newHashMap();
+properties.put("writer-thread", String.valueOf(Thread.currentThread().getName()));
+CommitMetadata.withCommitProperties(properties,
+        () -> {
+        spark.sql("DELETE FROM " + tableName + " where id = 1");
+        return 0;
+        },
+        RuntimeException.class);
+
+```

Review Comment:
   Can we split this PR and move the doc change to a new PR? 



##########
spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java:
##########
@@ -445,4 +448,40 @@ public void testExtraSnapshotMetadataWithSQL() throws InterruptedException, IOEx
     Assert.assertTrue(threadNames.contains(null));
     Assert.assertTrue(threadNames.contains("test-extra-commit-message-writer-thread"));
   }
+
+  @Test
+  public void testExtraSnapshotMetadataWithDelete()
+      throws InterruptedException, IOException, NoSuchTableException {
+    spark.sessionState().conf().setConfString("spark.sql.shuffle.partitions", "1");
+    sql("CREATE TABLE %s (id INT, data STRING) USING iceberg", tableName);
+    List<SimpleRecord> expectedRecords =
+        Lists.newArrayList(
+            new SimpleRecord(1, "a"), new SimpleRecord(2, "b"), new SimpleRecord(3, "c"));
+    Dataset<Row> originalDf = spark.createDataFrame(expectedRecords, SimpleRecord.class);
+    originalDf.repartition(5, new Column("data")).select("id", "data").writeTo(tableName).append();
+    Thread writerThread =
+        new Thread(
+            () -> {
+              Map<String, String> properties = Maps.newHashMap();
+              properties.put("writer-thread", String.valueOf(Thread.currentThread().getName()));
+              CommitMetadata.withCommitProperties(
+                  properties,
+                  () -> {
+                    spark.sql("DELETE FROM " + tableName + " where id = 1");
+                    return 0;
+                  },
+                  RuntimeException.class);
+            });
+    writerThread.setName("test-extra-commit-message-delete-thread");
+    writerThread.start();
+    writerThread.join();

Review Comment:
   Nit: an empty line to make it more readable?



##########
spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java:
##########
@@ -445,4 +448,40 @@ public void testExtraSnapshotMetadataWithSQL() throws InterruptedException, IOEx
     Assert.assertTrue(threadNames.contains(null));
     Assert.assertTrue(threadNames.contains("test-extra-commit-message-writer-thread"));
   }
+
+  @Test
+  public void testExtraSnapshotMetadataWithDelete()
+      throws InterruptedException, IOException, NoSuchTableException {
+    spark.sessionState().conf().setConfString("spark.sql.shuffle.partitions", "1");
+    sql("CREATE TABLE %s (id INT, data STRING) USING iceberg", tableName);
+    List<SimpleRecord> expectedRecords =
+        Lists.newArrayList(
+            new SimpleRecord(1, "a"), new SimpleRecord(2, "b"), new SimpleRecord(3, "c"));
+    Dataset<Row> originalDf = spark.createDataFrame(expectedRecords, SimpleRecord.class);
+    originalDf.repartition(5, new Column("data")).select("id", "data").writeTo(tableName).append();
+    Thread writerThread =
+        new Thread(
+            () -> {
+              Map<String, String> properties = Maps.newHashMap();
+              properties.put("writer-thread", String.valueOf(Thread.currentThread().getName()));
+              CommitMetadata.withCommitProperties(
+                  properties,
+                  () -> {
+                    spark.sql("DELETE FROM " + tableName + " where id = 1");
+                    return 0;
+                  },
+                  RuntimeException.class);
+            });
+    writerThread.setName("test-extra-commit-message-delete-thread");
+    writerThread.start();
+    writerThread.join();
+    Set<String> threadNames = Sets.newHashSet();
+    Table table = validationCatalog.loadTable(tableIdent);
+    for (Snapshot snapshot : table.snapshots()) {
+      threadNames.add(snapshot.summary().get("writer-thread"));
+    }
+    Assert.assertEquals(2, threadNames.size());
+    Assert.assertTrue(threadNames.contains(null));
+    Assert.assertTrue(threadNames.contains("test-extra-commit-message-delete-thread"));

Review Comment:
   Would this be a bit more expressive to show which snapshot has the property?
   ```
       List<Snapshot> snapshots = Lists.newArrayList(table.snapshots());
       Assert.assertEquals(2, snapshots.size());
       Assert.assertNull(snapshots.get(0).summary().get("writer-thread"));
       Assert.assertEquals("test-extra-commit-message-delete-thread", snapshots.get(1).summary().get("writer-thread"));
   ``` 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] CodingCat commented on a diff in pull request #7649: enable adding commit msg with metadata delete and add docs about adding commit msg with sql

Posted by "CodingCat (via GitHub)" <gi...@apache.org>.
CodingCat commented on code in PR #7649:
URL: https://github.com/apache/iceberg/pull/7649#discussion_r1204947138


##########
docs/spark-configuration.md:
##########
@@ -194,3 +194,20 @@ df.write
 | check-ordering       | true        | Checks if input schema and table schema are same  |
 | isolation-level | null | Desired isolation level for Dataframe overwrite operations.  `null` => no checks (for idempotent writes), `serializable` => check for concurrent inserts or deletes in destination partitions, `snapshot` => checks for concurrent deletes in destination partitions. |
 | validate-from-snapshot-id | null | If isolation level is set, id of base snapshot from which to check concurrent write conflicts into a table. Should be the snapshot before any reads from the table. Can be obtained via [Table API](../../api#table-metadata) or [Snapshots table](../spark-queries#snapshots). If null, the table's oldest known snapshot is used. |
+
+
+specifically, if you run SQL statements, you could use `org.apache.iceberg.spark.CommitMetadata` to add entries with custom-keys and corresponding values in the snapshot summary
+
+```java
+
+import org.apache.iceberg.spark.CommitMetadata;
+Map<String, String> properties = Maps.newHashMap();
+properties.put("writer-thread", String.valueOf(Thread.currentThread().getName()));
+CommitMetadata.withCommitProperties(properties,
+        () -> {
+        spark.sql("DELETE FROM " + tableName + " where id = 1");
+        return 0;
+        },
+        RuntimeException.class);
+
+```

Review Comment:
   updated



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] CodingCat commented on a diff in pull request #7649: enable adding commit msg with metadata delete and add docs about adding commit msg with sql

Posted by "CodingCat (via GitHub)" <gi...@apache.org>.
CodingCat commented on code in PR #7649:
URL: https://github.com/apache/iceberg/pull/7649#discussion_r1204943222


##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java:
##########
@@ -157,6 +157,7 @@ public DeltaWriterFactory createBatchWriterFactory(PhysicalWriteInfo info) {
 
     @Override
     public void commit(WriterCommitMessage[] messages) {
+

Review Comment:
   updated



##########
spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java:
##########
@@ -448,4 +451,52 @@ public void testExtraSnapshotMetadataWithSQL() throws InterruptedException, IOEx
     Assert.assertTrue(threadNames.contains(null));
     Assert.assertTrue(threadNames.contains("test-extra-commit-message-writer-thread"));
   }
+
+  @Test
+  public void testExtraSnapshotMetadataWithDelete()
+      throws InterruptedException, IOException, NoSuchTableException {
+    spark.sessionState().conf().setConfString("spark.sql.shuffle.partitions", "1");
+    sql("CREATE TABLE %s (id INT, data STRING) USING iceberg", tableName);
+    List<SimpleRecord> expectedRecords =
+        Lists.newArrayList(
+            new SimpleRecord(1, "a"), new SimpleRecord(2, "b"), new SimpleRecord(3, "c"));
+    Dataset<Row> originalDf = spark.createDataFrame(expectedRecords, SimpleRecord.class);
+    originalDf.repartition(5, new Column("data")).select("id", "data").writeTo(tableName).append();
+    spark.sql("SELECT * from " + tableName + ".files").show();
+    System.out.println(
+        spark
+            .sql("EXPLAIN DELETE FROM " + tableName + " where id = 1")
+            .collectAsList()
+            .get(0)
+            .get(0));
+    System.out.println("finished inserting");
+    Thread writerThread =
+        new Thread(
+            () -> {
+              Map<String, String> properties = Maps.newHashMap();
+              properties.put("writer-thread", String.valueOf(Thread.currentThread().getName()));
+              CommitMetadata.withCommitProperties(
+                  properties,
+                  () -> {
+                    spark.sql("DELETE FROM " + tableName + " where id = 1");
+                    return 0;
+                  },
+                  RuntimeException.class);
+            });
+    writerThread.setName("test-extra-commit-message-delete-thread");
+    writerThread.start();
+    writerThread.join();
+    Set<String> threadNames = Sets.newHashSet();
+    spark.sql("SELECT * from " + tableName).show();
+    Table table = validationCatalog.loadTable(tableIdent);
+    for (Snapshot snapshot : table.snapshots()) {
+      threadNames.add(snapshot.summary().get("writer-thread"));
+    }
+    for (String t : threadNames) {
+      System.out.println(t);
+    }

Review Comment:
   oops, forgot to cleanup



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] flyrain commented on a diff in pull request #7649: enable adding commit msg with metadata delete and add docs about adding commit msg with sql

Posted by "flyrain (via GitHub)" <gi...@apache.org>.
flyrain commented on code in PR #7649:
URL: https://github.com/apache/iceberg/pull/7649#discussion_r1205825740


##########
docs/spark-configuration.md:
##########
@@ -194,3 +194,4 @@ df.write
 | check-ordering       | true        | Checks if input schema and table schema are same  |
 | isolation-level | null | Desired isolation level for Dataframe overwrite operations.  `null` => no checks (for idempotent writes), `serializable` => check for concurrent inserts or deletes in destination partitions, `snapshot` => checks for concurrent deletes in destination partitions. |
 | validate-from-snapshot-id | null | If isolation level is set, id of base snapshot from which to check concurrent write conflicts into a table. Should be the snapshot before any reads from the table. Can be obtained via [Table API](../../api#table-metadata) or [Snapshots table](../spark-queries#snapshots). If null, the table's oldest known snapshot is used. |
+

Review Comment:
   Can we remove this change?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] CodingCat commented on a diff in pull request #7649: enable adding commit msg with metadata delete and add docs about adding commit msg with sql

Posted by "CodingCat (via GitHub)" <gi...@apache.org>.
CodingCat commented on code in PR #7649:
URL: https://github.com/apache/iceberg/pull/7649#discussion_r1204943750


##########
spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDataSourceV2Strategy.scala:
##########
@@ -122,6 +122,8 @@ case class ExtendedDataSourceV2Strategy(spark: SparkSession) extends Strategy wi
         }
         filter
       }.toArray
+      // scalastyle:off
+      println("DeleteFromIcebergTable")

Review Comment:
   removed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] flyrain commented on pull request #7649: Enable extra commit properties with metadata delete

Posted by "flyrain (via GitHub)" <gi...@apache.org>.
flyrain commented on PR #7649:
URL: https://github.com/apache/iceberg/pull/7649#issuecomment-1564710948

   Merged. Thanks @CodingCat for the change. 
   
   > what i mean was that if we move commitOperation to a base class, can we guarantee we always call the base class's commitOperation in future? (probably hard to make such a commitment, but that would break the effort of refactoring again)
   
   Not sure how it looks finally, but worth to have a try.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] CodingCat commented on a diff in pull request #7649: enable adding commit msg with metadata delete and add docs about adding commit msg with sql

Posted by "CodingCat (via GitHub)" <gi...@apache.org>.
CodingCat commented on code in PR #7649:
URL: https://github.com/apache/iceberg/pull/7649#discussion_r1204943377


##########
spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java:
##########
@@ -448,4 +451,52 @@ public void testExtraSnapshotMetadataWithSQL() throws InterruptedException, IOEx
     Assert.assertTrue(threadNames.contains(null));
     Assert.assertTrue(threadNames.contains("test-extra-commit-message-writer-thread"));
   }
+
+  @Test
+  public void testExtraSnapshotMetadataWithDelete()
+      throws InterruptedException, IOException, NoSuchTableException {
+    spark.sessionState().conf().setConfString("spark.sql.shuffle.partitions", "1");
+    sql("CREATE TABLE %s (id INT, data STRING) USING iceberg", tableName);
+    List<SimpleRecord> expectedRecords =
+        Lists.newArrayList(
+            new SimpleRecord(1, "a"), new SimpleRecord(2, "b"), new SimpleRecord(3, "c"));
+    Dataset<Row> originalDf = spark.createDataFrame(expectedRecords, SimpleRecord.class);
+    originalDf.repartition(5, new Column("data")).select("id", "data").writeTo(tableName).append();
+    spark.sql("SELECT * from " + tableName + ".files").show();

Review Comment:
   removed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] CodingCat commented on a diff in pull request #7649: enable adding commit msg with metadata delete and add docs about adding commit msg with sql

Posted by "CodingCat (via GitHub)" <gi...@apache.org>.
CodingCat commented on code in PR #7649:
URL: https://github.com/apache/iceberg/pull/7649#discussion_r1204942924


##########
spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java:
##########
@@ -445,4 +448,40 @@ public void testExtraSnapshotMetadataWithSQL() throws InterruptedException, IOEx
     Assert.assertTrue(threadNames.contains(null));
     Assert.assertTrue(threadNames.contains("test-extra-commit-message-writer-thread"));
   }
+
+  @Test
+  public void testExtraSnapshotMetadataWithDelete()
+      throws InterruptedException, IOException, NoSuchTableException {

Review Comment:
   good point, updated



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] CodingCat commented on pull request #7649: enable adding commit msg with metadata delete and add docs about adding commit msg with sql

Posted by "CodingCat (via GitHub)" <gi...@apache.org>.
CodingCat commented on PR #7649:
URL: https://github.com/apache/iceberg/pull/7649#issuecomment-1563286666

   > > IIUC, you mean we move the code consuming CommitMetadata to a base class so that we will not fall into the situation like this PR tries to address?
   > 
   > Exactly. We can refactor later. I'm OK with a quick fix like this.
   > 
   > > (2) in future, can we guarantee to use the same commit implementation for all snapshot producers (at least in Spark)?
   > 
   > I'm not sure if I got you correctly. I assume we do need the same implementation. Any concern?
   
   what i mean was that if we move `commitOperation` to a base class, can we guarantee we always call the base class's commitOperation in future? (probably hard to make such a commitment, but that would break the effort of refactoring again)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org