You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by "flyrain (via GitHub)" <gi...@apache.org> on 2023/05/25 01:44:39 UTC

[GitHub] [iceberg] flyrain commented on a diff in pull request #7649: enable adding commit msg with metadata delete and add docs about adding commit msg with sql

flyrain commented on code in PR #7649:
URL: https://github.com/apache/iceberg/pull/7649#discussion_r1204884505


##########
spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java:
##########
@@ -445,4 +448,40 @@ public void testExtraSnapshotMetadataWithSQL() throws InterruptedException, IOEx
     Assert.assertTrue(threadNames.contains(null));
     Assert.assertTrue(threadNames.contains("test-extra-commit-message-writer-thread"));
   }
+
+  @Test
+  public void testExtraSnapshotMetadataWithDelete()
+      throws InterruptedException, IOException, NoSuchTableException {

Review Comment:
   Nit: No IOException threw here.



##########
spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java:
##########
@@ -448,4 +451,52 @@ public void testExtraSnapshotMetadataWithSQL() throws InterruptedException, IOEx
     Assert.assertTrue(threadNames.contains(null));
     Assert.assertTrue(threadNames.contains("test-extra-commit-message-writer-thread"));
   }
+
+  @Test
+  public void testExtraSnapshotMetadataWithDelete()
+      throws InterruptedException, IOException, NoSuchTableException {
+    spark.sessionState().conf().setConfString("spark.sql.shuffle.partitions", "1");
+    sql("CREATE TABLE %s (id INT, data STRING) USING iceberg", tableName);
+    List<SimpleRecord> expectedRecords =
+        Lists.newArrayList(
+            new SimpleRecord(1, "a"), new SimpleRecord(2, "b"), new SimpleRecord(3, "c"));
+    Dataset<Row> originalDf = spark.createDataFrame(expectedRecords, SimpleRecord.class);
+    originalDf.repartition(5, new Column("data")).select("id", "data").writeTo(tableName).append();
+    spark.sql("SELECT * from " + tableName + ".files").show();

Review Comment:
   Do we need this line?



##########
spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java:
##########
@@ -448,4 +451,52 @@ public void testExtraSnapshotMetadataWithSQL() throws InterruptedException, IOEx
     Assert.assertTrue(threadNames.contains(null));
     Assert.assertTrue(threadNames.contains("test-extra-commit-message-writer-thread"));
   }
+
+  @Test
+  public void testExtraSnapshotMetadataWithDelete()
+      throws InterruptedException, IOException, NoSuchTableException {
+    spark.sessionState().conf().setConfString("spark.sql.shuffle.partitions", "1");
+    sql("CREATE TABLE %s (id INT, data STRING) USING iceberg", tableName);
+    List<SimpleRecord> expectedRecords =
+        Lists.newArrayList(
+            new SimpleRecord(1, "a"), new SimpleRecord(2, "b"), new SimpleRecord(3, "c"));
+    Dataset<Row> originalDf = spark.createDataFrame(expectedRecords, SimpleRecord.class);
+    originalDf.repartition(5, new Column("data")).select("id", "data").writeTo(tableName).append();
+    spark.sql("SELECT * from " + tableName + ".files").show();
+    System.out.println(
+        spark
+            .sql("EXPLAIN DELETE FROM " + tableName + " where id = 1")
+            .collectAsList()
+            .get(0)
+            .get(0));
+    System.out.println("finished inserting");
+    Thread writerThread =
+        new Thread(
+            () -> {
+              Map<String, String> properties = Maps.newHashMap();
+              properties.put("writer-thread", String.valueOf(Thread.currentThread().getName()));
+              CommitMetadata.withCommitProperties(
+                  properties,
+                  () -> {
+                    spark.sql("DELETE FROM " + tableName + " where id = 1");
+                    return 0;
+                  },
+                  RuntimeException.class);
+            });
+    writerThread.setName("test-extra-commit-message-delete-thread");
+    writerThread.start();
+    writerThread.join();
+    Set<String> threadNames = Sets.newHashSet();
+    spark.sql("SELECT * from " + tableName).show();
+    Table table = validationCatalog.loadTable(tableIdent);
+    for (Snapshot snapshot : table.snapshots()) {
+      threadNames.add(snapshot.summary().get("writer-thread"));
+    }
+    for (String t : threadNames) {
+      System.out.println(t);
+    }

Review Comment:
   Is it necessary?



##########
spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java:
##########
@@ -448,4 +451,52 @@ public void testExtraSnapshotMetadataWithSQL() throws InterruptedException, IOEx
     Assert.assertTrue(threadNames.contains(null));
     Assert.assertTrue(threadNames.contains("test-extra-commit-message-writer-thread"));
   }
+
+  @Test
+  public void testExtraSnapshotMetadataWithDelete()
+      throws InterruptedException, IOException, NoSuchTableException {
+    spark.sessionState().conf().setConfString("spark.sql.shuffle.partitions", "1");
+    sql("CREATE TABLE %s (id INT, data STRING) USING iceberg", tableName);
+    List<SimpleRecord> expectedRecords =
+        Lists.newArrayList(
+            new SimpleRecord(1, "a"), new SimpleRecord(2, "b"), new SimpleRecord(3, "c"));
+    Dataset<Row> originalDf = spark.createDataFrame(expectedRecords, SimpleRecord.class);
+    originalDf.repartition(5, new Column("data")).select("id", "data").writeTo(tableName).append();
+    spark.sql("SELECT * from " + tableName + ".files").show();
+    System.out.println(
+        spark
+            .sql("EXPLAIN DELETE FROM " + tableName + " where id = 1")
+            .collectAsList()
+            .get(0)
+            .get(0));
+    System.out.println("finished inserting");

Review Comment:
   Can we remove `System.out.println`?



##########
spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDataSourceV2Strategy.scala:
##########
@@ -122,6 +122,8 @@ case class ExtendedDataSourceV2Strategy(spark: SparkSession) extends Strategy wi
         }
         filter
       }.toArray
+      // scalastyle:off
+      println("DeleteFromIcebergTable")

Review Comment:
   Is this change relevant?



##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java:
##########
@@ -157,6 +157,7 @@ public DeltaWriterFactory createBatchWriterFactory(PhysicalWriteInfo info) {
 
     @Override
     public void commit(WriterCommitMessage[] messages) {
+

Review Comment:
   Nit: this seems irrelevant. Can we remove it?



##########
docs/spark-configuration.md:
##########
@@ -194,3 +194,20 @@ df.write
 | check-ordering       | true        | Checks if input schema and table schema are same  |
 | isolation-level | null | Desired isolation level for Dataframe overwrite operations.  `null` => no checks (for idempotent writes), `serializable` => check for concurrent inserts or deletes in destination partitions, `snapshot` => checks for concurrent deletes in destination partitions. |
 | validate-from-snapshot-id | null | If isolation level is set, id of base snapshot from which to check concurrent write conflicts into a table. Should be the snapshot before any reads from the table. Can be obtained via [Table API](../../api#table-metadata) or [Snapshots table](../spark-queries#snapshots). If null, the table's oldest known snapshot is used. |
+
+
+specifically, if you run SQL statements, you could use `org.apache.iceberg.spark.CommitMetadata` to add entries with custom-keys and corresponding values in the snapshot summary
+
+```java
+
+import org.apache.iceberg.spark.CommitMetadata;
+Map<String, String> properties = Maps.newHashMap();
+properties.put("writer-thread", String.valueOf(Thread.currentThread().getName()));
+CommitMetadata.withCommitProperties(properties,
+        () -> {
+        spark.sql("DELETE FROM " + tableName + " where id = 1");
+        return 0;
+        },
+        RuntimeException.class);
+
+```

Review Comment:
   Can we split this PR and move the doc change to a new PR? 



##########
spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java:
##########
@@ -445,4 +448,40 @@ public void testExtraSnapshotMetadataWithSQL() throws InterruptedException, IOEx
     Assert.assertTrue(threadNames.contains(null));
     Assert.assertTrue(threadNames.contains("test-extra-commit-message-writer-thread"));
   }
+
+  @Test
+  public void testExtraSnapshotMetadataWithDelete()
+      throws InterruptedException, IOException, NoSuchTableException {
+    spark.sessionState().conf().setConfString("spark.sql.shuffle.partitions", "1");
+    sql("CREATE TABLE %s (id INT, data STRING) USING iceberg", tableName);
+    List<SimpleRecord> expectedRecords =
+        Lists.newArrayList(
+            new SimpleRecord(1, "a"), new SimpleRecord(2, "b"), new SimpleRecord(3, "c"));
+    Dataset<Row> originalDf = spark.createDataFrame(expectedRecords, SimpleRecord.class);
+    originalDf.repartition(5, new Column("data")).select("id", "data").writeTo(tableName).append();
+    Thread writerThread =
+        new Thread(
+            () -> {
+              Map<String, String> properties = Maps.newHashMap();
+              properties.put("writer-thread", String.valueOf(Thread.currentThread().getName()));
+              CommitMetadata.withCommitProperties(
+                  properties,
+                  () -> {
+                    spark.sql("DELETE FROM " + tableName + " where id = 1");
+                    return 0;
+                  },
+                  RuntimeException.class);
+            });
+    writerThread.setName("test-extra-commit-message-delete-thread");
+    writerThread.start();
+    writerThread.join();

Review Comment:
   Nit: an empty line to make it more readable?



##########
spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java:
##########
@@ -445,4 +448,40 @@ public void testExtraSnapshotMetadataWithSQL() throws InterruptedException, IOEx
     Assert.assertTrue(threadNames.contains(null));
     Assert.assertTrue(threadNames.contains("test-extra-commit-message-writer-thread"));
   }
+
+  @Test
+  public void testExtraSnapshotMetadataWithDelete()
+      throws InterruptedException, IOException, NoSuchTableException {
+    spark.sessionState().conf().setConfString("spark.sql.shuffle.partitions", "1");
+    sql("CREATE TABLE %s (id INT, data STRING) USING iceberg", tableName);
+    List<SimpleRecord> expectedRecords =
+        Lists.newArrayList(
+            new SimpleRecord(1, "a"), new SimpleRecord(2, "b"), new SimpleRecord(3, "c"));
+    Dataset<Row> originalDf = spark.createDataFrame(expectedRecords, SimpleRecord.class);
+    originalDf.repartition(5, new Column("data")).select("id", "data").writeTo(tableName).append();
+    Thread writerThread =
+        new Thread(
+            () -> {
+              Map<String, String> properties = Maps.newHashMap();
+              properties.put("writer-thread", String.valueOf(Thread.currentThread().getName()));
+              CommitMetadata.withCommitProperties(
+                  properties,
+                  () -> {
+                    spark.sql("DELETE FROM " + tableName + " where id = 1");
+                    return 0;
+                  },
+                  RuntimeException.class);
+            });
+    writerThread.setName("test-extra-commit-message-delete-thread");
+    writerThread.start();
+    writerThread.join();
+    Set<String> threadNames = Sets.newHashSet();
+    Table table = validationCatalog.loadTable(tableIdent);
+    for (Snapshot snapshot : table.snapshots()) {
+      threadNames.add(snapshot.summary().get("writer-thread"));
+    }
+    Assert.assertEquals(2, threadNames.size());
+    Assert.assertTrue(threadNames.contains(null));
+    Assert.assertTrue(threadNames.contains("test-extra-commit-message-delete-thread"));

Review Comment:
   Would this be a bit more expressive to show which snapshot has the property?
   ```
       List<Snapshot> snapshots = Lists.newArrayList(table.snapshots());
       Assert.assertEquals(2, snapshots.size());
       Assert.assertNull(snapshots.get(0).summary().get("writer-thread"));
       Assert.assertEquals("test-extra-commit-message-delete-thread", snapshots.get(1).summary().get("writer-thread"));
   ``` 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org