You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by "stevenzwu (via GitHub)" <gi...@apache.org> on 2023/04/14 16:43:57 UTC

[GitHub] [iceberg] stevenzwu commented on a diff in pull request #7218: Flink: Use starting sequence number by default when rewriting data files

stevenzwu commented on code in PR #7218:
URL: https://github.com/apache/iceberg/pull/7218#discussion_r1167033674


##########
core/src/main/java/org/apache/iceberg/actions/BaseRewriteDataFilesAction.java:
##########
@@ -198,6 +200,21 @@ public BaseRewriteDataFilesAction<ThisT> filter(Expression expr) {
     return this;
   }
 
+  /**
+   * If the compaction should use the sequence number of the snapshot at compaction start time for
+   * new data files, instead of using the sequence number of the newly produced snapshot.
+   *
+   * <p>This avoids commit conflicts with updates that add newer equality deletes at a higher
+   * sequence number.
+   *
+   * @param useStarting use starting sequence number if set to true
+   * @return this for method chaining
+   */
+  public BaseRewriteDataFilesAction<ThisT> useStartingSequenceNumber(boolean useStarting) {

Review Comment:
   it seems that `BaseRewriteDataFilesAction` is only extended by Flink actions. I am fine with adding this API in the core module as a quick fix for now. agree that long-term plan should be on the new rewrite interfaces. But for backward compatibility, should the default be false? and Flink rewrite action can pass in `true`.



##########
flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/actions/TestRewriteDataFilesAction.java:
##########
@@ -386,4 +402,56 @@ public void testRewriteAvoidRepeateCompress() throws IOException {
     expected.add(SimpleDataUtil.createRecord(2, "b"));
     SimpleDataUtil.assertTableRecords(icebergTableUnPartitioned, expected);
   }
+
+  @Test
+  public void testRewriteNoConflictWithEqualityDeletes() throws IOException {
+    // Add 2 data files
+    sql("INSERT INTO %s SELECT 1, 'hello'", TABLE_NAME_WITH_PK);
+    sql("INSERT INTO %s SELECT 2, 'world'", TABLE_NAME_WITH_PK);
+
+    // Load 2 stale tables to pass to rewrite actions
+    Table stale1 =

Review Comment:
   aren't stale1 and stale2 referencing the same snapshot in this case? do you mean't to do the load table after each insert statement above?



##########
flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/actions/TestRewriteDataFilesAction.java:
##########
@@ -386,4 +402,56 @@ public void testRewriteAvoidRepeateCompress() throws IOException {
     expected.add(SimpleDataUtil.createRecord(2, "b"));
     SimpleDataUtil.assertTableRecords(icebergTableUnPartitioned, expected);
   }
+
+  @Test
+  public void testRewriteNoConflictWithEqualityDeletes() throws IOException {
+    // Add 2 data files
+    sql("INSERT INTO %s SELECT 1, 'hello'", TABLE_NAME_WITH_PK);
+    sql("INSERT INTO %s SELECT 2, 'world'", TABLE_NAME_WITH_PK);
+
+    // Load 2 stale tables to pass to rewrite actions
+    Table stale1 =
+        validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, TABLE_NAME_WITH_PK));
+    Table stale2 =
+        validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, TABLE_NAME_WITH_PK));
+
+    // Add 1 data file and 1 equality-delete file
+    sql("INSERT INTO %s /*+ OPTIONS('upsert-enabled'='true')*/ SELECT 1, 'hi'", TABLE_NAME_WITH_PK);
+
+    icebergTableWithPk.refresh();
+    CloseableIterable<FileScanTask> tasks = icebergTableWithPk.newScan().planFiles();
+    List<DataFile> dataFiles =
+        Lists.newArrayList(CloseableIterable.transform(tasks, FileScanTask::file));
+    Set<DeleteFile> deleteFiles =
+        Lists.newArrayList(CloseableIterable.transform(tasks, FileScanTask::deletes)).stream()
+            .flatMap(Collection::stream)
+            .collect(Collectors.toSet());
+    Assert.assertEquals("Should have 3 data files before rewrite", 3, dataFiles.size());
+    Assert.assertEquals("Should have 1 delete file before rewrite", 1, deleteFiles.size());
+    Assert.assertSame(
+        "The 1 delete file should be an equality-delete file",
+        Iterables.getOnlyElement(deleteFiles).content(),
+        FileContent.EQUALITY_DELETES);
+
+    Assertions.assertThatThrownBy(
+            () ->
+                Actions.forTable(stale1)
+                    .rewriteDataFiles()
+                    .useStartingSequenceNumber(false)
+                    .execute(),
+            "Rewrite using new sequence number should fail")
+        .isInstanceOf(ValidationException.class);
+    // Rewrite using the starting sequence number should succeed

Review Comment:
   nit: add an new empty line to easily see there are two separate tests.



##########
flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/actions/TestRewriteDataFilesAction.java:
##########
@@ -386,4 +402,56 @@ public void testRewriteAvoidRepeateCompress() throws IOException {
     expected.add(SimpleDataUtil.createRecord(2, "b"));
     SimpleDataUtil.assertTableRecords(icebergTableUnPartitioned, expected);
   }
+
+  @Test
+  public void testRewriteNoConflictWithEqualityDeletes() throws IOException {
+    // Add 2 data files
+    sql("INSERT INTO %s SELECT 1, 'hello'", TABLE_NAME_WITH_PK);
+    sql("INSERT INTO %s SELECT 2, 'world'", TABLE_NAME_WITH_PK);
+
+    // Load 2 stale tables to pass to rewrite actions
+    Table stale1 =
+        validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, TABLE_NAME_WITH_PK));
+    Table stale2 =
+        validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, TABLE_NAME_WITH_PK));
+
+    // Add 1 data file and 1 equality-delete file
+    sql("INSERT INTO %s /*+ OPTIONS('upsert-enabled'='true')*/ SELECT 1, 'hi'", TABLE_NAME_WITH_PK);
+
+    icebergTableWithPk.refresh();
+    CloseableIterable<FileScanTask> tasks = icebergTableWithPk.newScan().planFiles();
+    List<DataFile> dataFiles =
+        Lists.newArrayList(CloseableIterable.transform(tasks, FileScanTask::file));
+    Set<DeleteFile> deleteFiles =
+        Lists.newArrayList(CloseableIterable.transform(tasks, FileScanTask::deletes)).stream()
+            .flatMap(Collection::stream)
+            .collect(Collectors.toSet());
+    Assert.assertEquals("Should have 3 data files before rewrite", 3, dataFiles.size());
+    Assert.assertEquals("Should have 1 delete file before rewrite", 1, deleteFiles.size());
+    Assert.assertSame(
+        "The 1 delete file should be an equality-delete file",
+        Iterables.getOnlyElement(deleteFiles).content(),
+        FileContent.EQUALITY_DELETES);

Review Comment:
   should we also add assertions on the file sequence numbers here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org