You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by "linyanghao (via GitHub)" <gi...@apache.org> on 2023/03/27 10:07:07 UTC

[GitHub] [iceberg] linyanghao opened a new pull request, #7218: Flink: Use starting sequence number by default when rewriting data files

linyanghao opened a new pull request, #7218:
URL: https://github.com/apache/iceberg/pull/7218

   When rewriting data files using Spark, the original starting sequence number is assigned to the rewritten files to avoid possible conflicts with newer eq-delete files. However, when rewriting data files using Flink, rewritten files are currently assigned a new sequence number. Consequently, conflicts between rewritten files and new eq-delete files cannot be ignored while checking conflicts in MergingSnapshotProducer.validateNoNewDeletesForDataFiles(). This creates an issue when continuously committing new eq-delete files to a table as we won't be able to rewrite files without conflicts.
   
   This PR proposes to solve this problem by enabling Flink to use the starting sequence number by default when rewriting data files.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] linyanghao commented on a diff in pull request #7218: Flink: Use starting sequence number by default when rewriting data files

Posted by "linyanghao (via GitHub)" <gi...@apache.org>.
linyanghao commented on code in PR #7218:
URL: https://github.com/apache/iceberg/pull/7218#discussion_r1169180646


##########
flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/actions/TestRewriteDataFilesAction.java:
##########
@@ -386,4 +402,56 @@ public void testRewriteAvoidRepeateCompress() throws IOException {
     expected.add(SimpleDataUtil.createRecord(2, "b"));
     SimpleDataUtil.assertTableRecords(icebergTableUnPartitioned, expected);
   }
+
+  @Test
+  public void testRewriteNoConflictWithEqualityDeletes() throws IOException {
+    // Add 2 data files
+    sql("INSERT INTO %s SELECT 1, 'hello'", TABLE_NAME_WITH_PK);
+    sql("INSERT INTO %s SELECT 2, 'world'", TABLE_NAME_WITH_PK);
+
+    // Load 2 stale tables to pass to rewrite actions
+    Table stale1 =
+        validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, TABLE_NAME_WITH_PK));
+    Table stale2 =
+        validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, TABLE_NAME_WITH_PK));
+
+    // Add 1 data file and 1 equality-delete file
+    sql("INSERT INTO %s /*+ OPTIONS('upsert-enabled'='true')*/ SELECT 1, 'hi'", TABLE_NAME_WITH_PK);
+
+    icebergTableWithPk.refresh();
+    CloseableIterable<FileScanTask> tasks = icebergTableWithPk.newScan().planFiles();
+    List<DataFile> dataFiles =
+        Lists.newArrayList(CloseableIterable.transform(tasks, FileScanTask::file));
+    Set<DeleteFile> deleteFiles =
+        Lists.newArrayList(CloseableIterable.transform(tasks, FileScanTask::deletes)).stream()
+            .flatMap(Collection::stream)
+            .collect(Collectors.toSet());
+    Assert.assertEquals("Should have 3 data files before rewrite", 3, dataFiles.size());
+    Assert.assertEquals("Should have 1 delete file before rewrite", 1, deleteFiles.size());
+    Assert.assertSame(
+        "The 1 delete file should be an equality-delete file",
+        Iterables.getOnlyElement(deleteFiles).content(),
+        FileContent.EQUALITY_DELETES);

Review Comment:
   Added an assertion that compare the sequence number of the current snapshot and stale ones. Did you mean this?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] linyanghao commented on a diff in pull request #7218: Flink: Use starting sequence number by default when rewriting data files

Posted by "linyanghao (via GitHub)" <gi...@apache.org>.
linyanghao commented on code in PR #7218:
URL: https://github.com/apache/iceberg/pull/7218#discussion_r1169175054


##########
core/src/main/java/org/apache/iceberg/actions/BaseRewriteDataFilesAction.java:
##########
@@ -198,6 +200,21 @@ public BaseRewriteDataFilesAction<ThisT> filter(Expression expr) {
     return this;
   }
 
+  /**
+   * If the compaction should use the sequence number of the snapshot at compaction start time for
+   * new data files, instead of using the sequence number of the newly produced snapshot.
+   *
+   * <p>This avoids commit conflicts with updates that add newer equality deletes at a higher
+   * sequence number.
+   *
+   * @param useStarting use starting sequence number if set to true
+   * @return this for method chaining
+   */
+  public BaseRewriteDataFilesAction<ThisT> useStartingSequenceNumber(boolean useStarting) {

Review Comment:
   That makes sense.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a diff in pull request #7218: Flink: Use starting sequence number by default when rewriting data files

Posted by "stevenzwu (via GitHub)" <gi...@apache.org>.
stevenzwu commented on code in PR #7218:
URL: https://github.com/apache/iceberg/pull/7218#discussion_r1173268655


##########
flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/actions/TestRewriteDataFilesAction.java:
##########
@@ -386,4 +405,84 @@ public void testRewriteAvoidRepeateCompress() throws IOException {
     expected.add(SimpleDataUtil.createRecord(2, "b"));
     SimpleDataUtil.assertTableRecords(icebergTableUnPartitioned, expected);
   }
+
+  @Test
+  public void testRewriteNoConflictWithEqualityDeletes() throws IOException {
+    // Add 2 data files
+    sql("INSERT INTO %s SELECT 1, 'hello'", TABLE_NAME_WITH_PK);
+    sql("INSERT INTO %s SELECT 2, 'world'", TABLE_NAME_WITH_PK);
+
+    // Load 2 stale tables to pass to rewrite actions
+    // Since the first rewrite will refresh stale1, we need another stale2 for the second rewrite
+    Table stale1 =
+        validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, TABLE_NAME_WITH_PK));
+    Table stale2 =
+        validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, TABLE_NAME_WITH_PK));
+
+    // Add 1 data file and 1 equality-delete file
+    sql("INSERT INTO %s /*+ OPTIONS('upsert-enabled'='true')*/ SELECT 1, 'hi'", TABLE_NAME_WITH_PK);
+
+    icebergTableWithPk.refresh();
+    Assert.assertEquals(
+        "The latest sequence number should be greater than that of the stale snapshot",
+        stale1.currentSnapshot().sequenceNumber() + 1,
+        icebergTableWithPk.currentSnapshot().sequenceNumber());
+
+    CloseableIterable<FileScanTask> tasks = icebergTableWithPk.newScan().planFiles();
+    List<DataFile> dataFiles =
+        Lists.newArrayList(CloseableIterable.transform(tasks, FileScanTask::file));
+    Set<DeleteFile> deleteFiles =
+        Lists.newArrayList(CloseableIterable.transform(tasks, FileScanTask::deletes)).stream()
+            .flatMap(Collection::stream)
+            .collect(Collectors.toSet());
+    Assert.assertEquals("Should have 3 data files before rewrite", 3, dataFiles.size());
+    Assert.assertEquals("Should have 1 delete file before rewrite", 1, deleteFiles.size());
+    Assert.assertSame(
+        "The 1 delete file should be an equality-delete file",
+        Iterables.getOnlyElement(deleteFiles).content(),
+        FileContent.EQUALITY_DELETES);
+    shouldHaveDataAndFileSequenceNumbers(
+        TABLE_NAME_WITH_PK,
+        ImmutableList.of(Pair.of(1L, 1L), Pair.of(2L, 2L), Pair.of(3L, 3L), Pair.of(3L, 3L)));
+
+    Assertions.assertThatThrownBy(
+            () ->
+                Actions.forTable(stale1)
+                    .rewriteDataFiles()
+                    .useStartingSequenceNumber(false)
+                    .execute(),
+            "Rewrite using new sequence number should fail")
+        .isInstanceOf(ValidationException.class);
+
+    // Rewrite using the starting sequence number should succeed
+    RewriteDataFilesActionResult result =
+        Actions.forTable(stale2).rewriteDataFiles().useStartingSequenceNumber(true).execute();
+
+    // Should not rewrite files from the new commit
+    Assert.assertEquals("Action should rewrite 2 data files", 2, result.deletedDataFiles().size());
+    Assert.assertEquals("Action should add 1 data file", 1, result.addedDataFiles().size());
+    shouldHaveDataAndFileSequenceNumbers(
+        TABLE_NAME_WITH_PK, ImmutableList.of(Pair.of(3L, 3L), Pair.of(3L, 3L), Pair.of(2L, 4L)));

Review Comment:
   nit: add a comment that the pair of (2L, 4L) demonstrate the expected behavior of useStartingSequenceNumber 



##########
flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/actions/TestRewriteDataFilesAction.java:
##########
@@ -386,4 +405,84 @@ public void testRewriteAvoidRepeateCompress() throws IOException {
     expected.add(SimpleDataUtil.createRecord(2, "b"));
     SimpleDataUtil.assertTableRecords(icebergTableUnPartitioned, expected);
   }
+
+  @Test
+  public void testRewriteNoConflictWithEqualityDeletes() throws IOException {
+    // Add 2 data files
+    sql("INSERT INTO %s SELECT 1, 'hello'", TABLE_NAME_WITH_PK);
+    sql("INSERT INTO %s SELECT 2, 'world'", TABLE_NAME_WITH_PK);
+
+    // Load 2 stale tables to pass to rewrite actions
+    // Since the first rewrite will refresh stale1, we need another stale2 for the second rewrite
+    Table stale1 =
+        validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, TABLE_NAME_WITH_PK));
+    Table stale2 =
+        validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, TABLE_NAME_WITH_PK));
+
+    // Add 1 data file and 1 equality-delete file
+    sql("INSERT INTO %s /*+ OPTIONS('upsert-enabled'='true')*/ SELECT 1, 'hi'", TABLE_NAME_WITH_PK);
+
+    icebergTableWithPk.refresh();
+    Assert.assertEquals(
+        "The latest sequence number should be greater than that of the stale snapshot",
+        stale1.currentSnapshot().sequenceNumber() + 1,
+        icebergTableWithPk.currentSnapshot().sequenceNumber());
+
+    CloseableIterable<FileScanTask> tasks = icebergTableWithPk.newScan().planFiles();
+    List<DataFile> dataFiles =
+        Lists.newArrayList(CloseableIterable.transform(tasks, FileScanTask::file));
+    Set<DeleteFile> deleteFiles =
+        Lists.newArrayList(CloseableIterable.transform(tasks, FileScanTask::deletes)).stream()
+            .flatMap(Collection::stream)
+            .collect(Collectors.toSet());
+    Assert.assertEquals("Should have 3 data files before rewrite", 3, dataFiles.size());
+    Assert.assertEquals("Should have 1 delete file before rewrite", 1, deleteFiles.size());
+    Assert.assertSame(
+        "The 1 delete file should be an equality-delete file",
+        Iterables.getOnlyElement(deleteFiles).content(),
+        FileContent.EQUALITY_DELETES);
+    shouldHaveDataAndFileSequenceNumbers(
+        TABLE_NAME_WITH_PK,
+        ImmutableList.of(Pair.of(1L, 1L), Pair.of(2L, 2L), Pair.of(3L, 3L), Pair.of(3L, 3L)));
+
+    Assertions.assertThatThrownBy(
+            () ->
+                Actions.forTable(stale1)
+                    .rewriteDataFiles()
+                    .useStartingSequenceNumber(false)
+                    .execute(),
+            "Rewrite using new sequence number should fail")
+        .isInstanceOf(ValidationException.class);
+
+    // Rewrite using the starting sequence number should succeed
+    RewriteDataFilesActionResult result =
+        Actions.forTable(stale2).rewriteDataFiles().useStartingSequenceNumber(true).execute();
+
+    // Should not rewrite files from the new commit
+    Assert.assertEquals("Action should rewrite 2 data files", 2, result.deletedDataFiles().size());
+    Assert.assertEquals("Action should add 1 data file", 1, result.addedDataFiles().size());
+    shouldHaveDataAndFileSequenceNumbers(
+        TABLE_NAME_WITH_PK, ImmutableList.of(Pair.of(3L, 3L), Pair.of(3L, 3L), Pair.of(2L, 4L)));
+
+    // Assert the table records as expected.
+    SimpleDataUtil.assertTableRecords(
+        icebergTableWithPk,
+        Lists.newArrayList(
+            SimpleDataUtil.createRecord(1, "hi"), SimpleDataUtil.createRecord(2, "world")));
+  }
+
+  private void shouldHaveDataAndFileSequenceNumbers(

Review Comment:
   nit: can you add a short description on the pair<long, long> for <sequence number, file sequence number>? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on pull request #7218: Flink: Use starting sequence number by default when rewriting data files

Posted by "stevenzwu (via GitHub)" <gi...@apache.org>.
stevenzwu commented on PR #7218:
URL: https://github.com/apache/iceberg/pull/7218#issuecomment-1518121710

   @linyanghao please create the backport PRs for Flink 1.15 and 1.17. please check the diff for the backport PR. E.g.
   
   ```
    git diff --no-index  flink/v1.16/flink/src/ flink/v1.17/flink/src
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] linyanghao commented on a diff in pull request #7218: Flink: Use starting sequence number by default when rewriting data files

Posted by "linyanghao (via GitHub)" <gi...@apache.org>.
linyanghao commented on code in PR #7218:
URL: https://github.com/apache/iceberg/pull/7218#discussion_r1173355010


##########
flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/actions/TestRewriteDataFilesAction.java:
##########
@@ -386,4 +405,84 @@ public void testRewriteAvoidRepeateCompress() throws IOException {
     expected.add(SimpleDataUtil.createRecord(2, "b"));
     SimpleDataUtil.assertTableRecords(icebergTableUnPartitioned, expected);
   }
+
+  @Test
+  public void testRewriteNoConflictWithEqualityDeletes() throws IOException {
+    // Add 2 data files
+    sql("INSERT INTO %s SELECT 1, 'hello'", TABLE_NAME_WITH_PK);
+    sql("INSERT INTO %s SELECT 2, 'world'", TABLE_NAME_WITH_PK);
+
+    // Load 2 stale tables to pass to rewrite actions
+    // Since the first rewrite will refresh stale1, we need another stale2 for the second rewrite
+    Table stale1 =
+        validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, TABLE_NAME_WITH_PK));
+    Table stale2 =
+        validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, TABLE_NAME_WITH_PK));
+
+    // Add 1 data file and 1 equality-delete file
+    sql("INSERT INTO %s /*+ OPTIONS('upsert-enabled'='true')*/ SELECT 1, 'hi'", TABLE_NAME_WITH_PK);
+
+    icebergTableWithPk.refresh();
+    Assert.assertEquals(
+        "The latest sequence number should be greater than that of the stale snapshot",
+        stale1.currentSnapshot().sequenceNumber() + 1,
+        icebergTableWithPk.currentSnapshot().sequenceNumber());
+
+    CloseableIterable<FileScanTask> tasks = icebergTableWithPk.newScan().planFiles();
+    List<DataFile> dataFiles =
+        Lists.newArrayList(CloseableIterable.transform(tasks, FileScanTask::file));
+    Set<DeleteFile> deleteFiles =
+        Lists.newArrayList(CloseableIterable.transform(tasks, FileScanTask::deletes)).stream()
+            .flatMap(Collection::stream)
+            .collect(Collectors.toSet());
+    Assert.assertEquals("Should have 3 data files before rewrite", 3, dataFiles.size());
+    Assert.assertEquals("Should have 1 delete file before rewrite", 1, deleteFiles.size());
+    Assert.assertSame(
+        "The 1 delete file should be an equality-delete file",
+        Iterables.getOnlyElement(deleteFiles).content(),
+        FileContent.EQUALITY_DELETES);
+    shouldHaveDataAndFileSequenceNumbers(
+        TABLE_NAME_WITH_PK,
+        ImmutableList.of(Pair.of(1L, 1L), Pair.of(2L, 2L), Pair.of(3L, 3L), Pair.of(3L, 3L)));
+
+    Assertions.assertThatThrownBy(
+            () ->
+                Actions.forTable(stale1)
+                    .rewriteDataFiles()
+                    .useStartingSequenceNumber(false)
+                    .execute(),
+            "Rewrite using new sequence number should fail")
+        .isInstanceOf(ValidationException.class);
+
+    // Rewrite using the starting sequence number should succeed
+    RewriteDataFilesActionResult result =
+        Actions.forTable(stale2).rewriteDataFiles().useStartingSequenceNumber(true).execute();
+
+    // Should not rewrite files from the new commit
+    Assert.assertEquals("Action should rewrite 2 data files", 2, result.deletedDataFiles().size());
+    Assert.assertEquals("Action should add 1 data file", 1, result.addedDataFiles().size());
+    shouldHaveDataAndFileSequenceNumbers(
+        TABLE_NAME_WITH_PK, ImmutableList.of(Pair.of(3L, 3L), Pair.of(3L, 3L), Pair.of(2L, 4L)));

Review Comment:
   Added



##########
flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/actions/TestRewriteDataFilesAction.java:
##########
@@ -386,4 +405,84 @@ public void testRewriteAvoidRepeateCompress() throws IOException {
     expected.add(SimpleDataUtil.createRecord(2, "b"));
     SimpleDataUtil.assertTableRecords(icebergTableUnPartitioned, expected);
   }
+
+  @Test
+  public void testRewriteNoConflictWithEqualityDeletes() throws IOException {
+    // Add 2 data files
+    sql("INSERT INTO %s SELECT 1, 'hello'", TABLE_NAME_WITH_PK);
+    sql("INSERT INTO %s SELECT 2, 'world'", TABLE_NAME_WITH_PK);
+
+    // Load 2 stale tables to pass to rewrite actions
+    // Since the first rewrite will refresh stale1, we need another stale2 for the second rewrite
+    Table stale1 =
+        validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, TABLE_NAME_WITH_PK));
+    Table stale2 =
+        validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, TABLE_NAME_WITH_PK));
+
+    // Add 1 data file and 1 equality-delete file
+    sql("INSERT INTO %s /*+ OPTIONS('upsert-enabled'='true')*/ SELECT 1, 'hi'", TABLE_NAME_WITH_PK);
+
+    icebergTableWithPk.refresh();
+    Assert.assertEquals(
+        "The latest sequence number should be greater than that of the stale snapshot",
+        stale1.currentSnapshot().sequenceNumber() + 1,
+        icebergTableWithPk.currentSnapshot().sequenceNumber());
+
+    CloseableIterable<FileScanTask> tasks = icebergTableWithPk.newScan().planFiles();
+    List<DataFile> dataFiles =
+        Lists.newArrayList(CloseableIterable.transform(tasks, FileScanTask::file));
+    Set<DeleteFile> deleteFiles =
+        Lists.newArrayList(CloseableIterable.transform(tasks, FileScanTask::deletes)).stream()
+            .flatMap(Collection::stream)
+            .collect(Collectors.toSet());
+    Assert.assertEquals("Should have 3 data files before rewrite", 3, dataFiles.size());
+    Assert.assertEquals("Should have 1 delete file before rewrite", 1, deleteFiles.size());
+    Assert.assertSame(
+        "The 1 delete file should be an equality-delete file",
+        Iterables.getOnlyElement(deleteFiles).content(),
+        FileContent.EQUALITY_DELETES);
+    shouldHaveDataAndFileSequenceNumbers(
+        TABLE_NAME_WITH_PK,
+        ImmutableList.of(Pair.of(1L, 1L), Pair.of(2L, 2L), Pair.of(3L, 3L), Pair.of(3L, 3L)));
+
+    Assertions.assertThatThrownBy(
+            () ->
+                Actions.forTable(stale1)
+                    .rewriteDataFiles()
+                    .useStartingSequenceNumber(false)
+                    .execute(),
+            "Rewrite using new sequence number should fail")
+        .isInstanceOf(ValidationException.class);
+
+    // Rewrite using the starting sequence number should succeed
+    RewriteDataFilesActionResult result =
+        Actions.forTable(stale2).rewriteDataFiles().useStartingSequenceNumber(true).execute();
+
+    // Should not rewrite files from the new commit
+    Assert.assertEquals("Action should rewrite 2 data files", 2, result.deletedDataFiles().size());
+    Assert.assertEquals("Action should add 1 data file", 1, result.addedDataFiles().size());
+    shouldHaveDataAndFileSequenceNumbers(
+        TABLE_NAME_WITH_PK, ImmutableList.of(Pair.of(3L, 3L), Pair.of(3L, 3L), Pair.of(2L, 4L)));
+
+    // Assert the table records as expected.
+    SimpleDataUtil.assertTableRecords(
+        icebergTableWithPk,
+        Lists.newArrayList(
+            SimpleDataUtil.createRecord(1, "hi"), SimpleDataUtil.createRecord(2, "world")));
+  }
+
+  private void shouldHaveDataAndFileSequenceNumbers(

Review Comment:
   Added



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu merged pull request #7218: Flink: Use starting sequence number by default when rewriting data files

Posted by "stevenzwu (via GitHub)" <gi...@apache.org>.
stevenzwu merged PR #7218:
URL: https://github.com/apache/iceberg/pull/7218


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] chenjunjiedada commented on a diff in pull request #7218: Flink: Use starting sequence number by default when rewriting data files

Posted by "chenjunjiedada (via GitHub)" <gi...@apache.org>.
chenjunjiedada commented on code in PR #7218:
URL: https://github.com/apache/iceberg/pull/7218#discussion_r1161380070


##########
flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/actions/TestRewriteDataFilesAction.java:
##########
@@ -20,9 +20,12 @@
 
 import static org.apache.iceberg.flink.SimpleDataUtil.RECORD;
 
+import com.google.common.collect.Iterables;

Review Comment:
   nit: should use shaded imports.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] linyanghao commented on a diff in pull request #7218: Flink: Use starting sequence number by default when rewriting data files

Posted by "linyanghao (via GitHub)" <gi...@apache.org>.
linyanghao commented on code in PR #7218:
URL: https://github.com/apache/iceberg/pull/7218#discussion_r1157013722


##########
flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/actions/TestRewriteDataFilesAction.java:
##########
@@ -386,4 +395,73 @@ public void testRewriteAvoidRepeateCompress() throws IOException {
     expected.add(SimpleDataUtil.createRecord(2, "b"));
     SimpleDataUtil.assertTableRecords(icebergTableUnPartitioned, expected);
   }
+
+  @Test
+  public void testRewriteNoConflictWithEqualityDeletes() throws IOException {
+    // Add 2 data files
+    sql("INSERT INTO %s SELECT 1, 'hello'", TABLE_NAME_UNPARTITIONED);
+    sql("INSERT INTO %s SELECT 2, 'world'", TABLE_NAME_UNPARTITIONED);
+
+    // Load 2 stale tables to pass to rewrite actions
+    Table stale1 =
+        validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, TABLE_NAME_UNPARTITIONED));
+    Table stale2 =
+        validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, TABLE_NAME_UNPARTITIONED));
+
+    // Enabled upsert
+    icebergTableUnPartitioned.refresh();
+    upgradeToVersion2((BaseTable) icebergTableUnPartitioned);
+    icebergTableUnPartitioned
+        .updateSchema()
+        .allowIncompatibleChanges()
+        .requireColumn("id")
+        .setIdentifierFields("id")
+        .commit();
+    icebergTableUnPartitioned
+        .updateProperties()
+        .set(TableProperties.UPSERT_ENABLED, "true")
+        .commit();
+
+    // Add 1 data file and 1 equality-delete file
+    sql("INSERT INTO %s SELECT 1, 'hi'", TABLE_NAME_UNPARTITIONED);
+
+    icebergTableUnPartitioned.refresh();

Review Comment:
   We need to refresh here, otherwise the files planned below will not be up-to-date.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] linyanghao commented on pull request #7218: Flink: Use starting sequence number by default when rewriting data files

Posted by "linyanghao (via GitHub)" <gi...@apache.org>.
linyanghao commented on PR #7218:
URL: https://github.com/apache/iceberg/pull/7218#issuecomment-1491977140

   > Can you add UT for this?
   
   Added a UT.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] hililiwei commented on pull request #7218: Flink: Use starting sequence number by default when rewriting data files

Posted by "hililiwei (via GitHub)" <gi...@apache.org>.
hililiwei commented on PR #7218:
URL: https://github.com/apache/iceberg/pull/7218#issuecomment-1491405113

   Can you add  UT for this?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] linyanghao commented on pull request #7218: Flink: Use starting sequence number by default when rewriting data files

Posted by "linyanghao (via GitHub)" <gi...@apache.org>.
linyanghao commented on PR #7218:
URL: https://github.com/apache/iceberg/pull/7218#issuecomment-1517332889

   > added a couple of nit comments. otherwise LGTM
   
   @stevenzwu Thanks for reviewing!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] hililiwei commented on a diff in pull request #7218: Flink: Use starting sequence number by default when rewriting data files

Posted by "hililiwei (via GitHub)" <gi...@apache.org>.
hililiwei commented on code in PR #7218:
URL: https://github.com/apache/iceberg/pull/7218#discussion_r1156834157


##########
core/src/main/java/org/apache/iceberg/actions/BaseRewriteDataFilesAction.java:
##########
@@ -198,6 +200,11 @@ public BaseRewriteDataFilesAction<ThisT> filter(Expression expr) {
     return this;
   }
 
+  public BaseRewriteDataFilesAction<ThisT> useStartingSequenceNumber(boolean useStarting) {

Review Comment:
   add doc.



##########
flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/actions/TestRewriteDataFilesAction.java:
##########
@@ -386,4 +395,73 @@ public void testRewriteAvoidRepeateCompress() throws IOException {
     expected.add(SimpleDataUtil.createRecord(2, "b"));
     SimpleDataUtil.assertTableRecords(icebergTableUnPartitioned, expected);
   }
+
+  @Test
+  public void testRewriteNoConflictWithEqualityDeletes() throws IOException {
+    // Add 2 data files
+    sql("INSERT INTO %s SELECT 1, 'hello'", TABLE_NAME_UNPARTITIONED);
+    sql("INSERT INTO %s SELECT 2, 'world'", TABLE_NAME_UNPARTITIONED);
+
+    // Load 2 stale tables to pass to rewrite actions
+    Table stale1 =
+        validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, TABLE_NAME_UNPARTITIONED));
+    Table stale2 =
+        validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, TABLE_NAME_UNPARTITIONED));
+
+    // Enabled upsert
+    icebergTableUnPartitioned.refresh();
+    upgradeToVersion2((BaseTable) icebergTableUnPartitioned);
+    icebergTableUnPartitioned
+        .updateSchema()
+        .allowIncompatibleChanges()
+        .requireColumn("id")
+        .setIdentifierFields("id")
+        .commit();
+    icebergTableUnPartitioned
+        .updateProperties()
+        .set(TableProperties.UPSERT_ENABLED, "true")
+        .commit();
+
+    // Add 1 data file and 1 equality-delete file
+    sql("INSERT INTO %s SELECT 1, 'hi'", TABLE_NAME_UNPARTITIONED);
+
+    icebergTableUnPartitioned.refresh();
+    CloseableIterable<FileScanTask> tasks = icebergTableUnPartitioned.newScan().planFiles();
+    List<DataFile> dataFiles =
+        Lists.newArrayList(CloseableIterable.transform(tasks, FileScanTask::file));
+    Set<DeleteFile> deleteFiles =
+        Lists.newArrayList(CloseableIterable.transform(tasks, FileScanTask::deletes)).stream()
+            .flatMap(Collection::stream)
+            .collect(Collectors.toSet());
+    Assert.assertEquals("Should have 3 data files before rewrite", 3, dataFiles.size());
+    Assert.assertTrue(
+        "Should have 1 equality-delete file before rewrite",
+        deleteFiles.size() == 1
+            && Iterables.getOnlyElement(deleteFiles).content() == FileContent.EQUALITY_DELETES);
+
+    Assert.assertThrows(
+        "Rewrite using new sequence number should fail",
+        ValidationException.class,
+        () ->
+            Actions.forTable(stale1).rewriteDataFiles().useStartingSequenceNumber(false).execute());
+    // Rewrite using the starting sequence number should succeed
+    RewriteDataFilesActionResult result = Actions.forTable(stale2).rewriteDataFiles().execute();
+
+    // Should not rewrite files from the new commit
+    Assert.assertEquals("Action should rewrite 2 data files", 2, result.deletedDataFiles().size());
+    Assert.assertEquals("Action should add 1 data file", 1, result.addedDataFiles().size());
+
+    icebergTableUnPartitioned.refresh();
+
+    // Assert the table records as expected.
+    SimpleDataUtil.assertTableRecords(
+        icebergTableUnPartitioned,
+        Lists.newArrayList(
+            SimpleDataUtil.createRecord(1, "hi"), SimpleDataUtil.createRecord(2, "world")));
+  }
+
+  private void upgradeToVersion2(BaseTable table) {

Review Comment:
   We don't need to purposely create a method for this.



##########
flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/actions/TestRewriteDataFilesAction.java:
##########
@@ -386,4 +395,73 @@ public void testRewriteAvoidRepeateCompress() throws IOException {
     expected.add(SimpleDataUtil.createRecord(2, "b"));
     SimpleDataUtil.assertTableRecords(icebergTableUnPartitioned, expected);
   }
+
+  @Test
+  public void testRewriteNoConflictWithEqualityDeletes() throws IOException {
+    // Add 2 data files
+    sql("INSERT INTO %s SELECT 1, 'hello'", TABLE_NAME_UNPARTITIONED);
+    sql("INSERT INTO %s SELECT 2, 'world'", TABLE_NAME_UNPARTITIONED);
+
+    // Load 2 stale tables to pass to rewrite actions
+    Table stale1 =
+        validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, TABLE_NAME_UNPARTITIONED));
+    Table stale2 =
+        validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, TABLE_NAME_UNPARTITIONED));
+
+    // Enabled upsert
+    icebergTableUnPartitioned.refresh();
+    upgradeToVersion2((BaseTable) icebergTableUnPartitioned);
+    icebergTableUnPartitioned
+        .updateSchema()
+        .allowIncompatibleChanges()
+        .requireColumn("id")
+        .setIdentifierFields("id")
+        .commit();

Review Comment:
   Create a table with primary keys in before().
   
   



##########
flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/actions/TestRewriteDataFilesAction.java:
##########
@@ -386,4 +395,73 @@ public void testRewriteAvoidRepeateCompress() throws IOException {
     expected.add(SimpleDataUtil.createRecord(2, "b"));
     SimpleDataUtil.assertTableRecords(icebergTableUnPartitioned, expected);
   }
+
+  @Test
+  public void testRewriteNoConflictWithEqualityDeletes() throws IOException {
+    // Add 2 data files
+    sql("INSERT INTO %s SELECT 1, 'hello'", TABLE_NAME_UNPARTITIONED);
+    sql("INSERT INTO %s SELECT 2, 'world'", TABLE_NAME_UNPARTITIONED);
+
+    // Load 2 stale tables to pass to rewrite actions
+    Table stale1 =
+        validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, TABLE_NAME_UNPARTITIONED));
+    Table stale2 =
+        validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, TABLE_NAME_UNPARTITIONED));
+
+    // Enabled upsert
+    icebergTableUnPartitioned.refresh();
+    upgradeToVersion2((BaseTable) icebergTableUnPartitioned);
+    icebergTableUnPartitioned
+        .updateSchema()
+        .allowIncompatibleChanges()
+        .requireColumn("id")
+        .setIdentifierFields("id")
+        .commit();
+    icebergTableUnPartitioned
+        .updateProperties()
+        .set(TableProperties.UPSERT_ENABLED, "true")
+        .commit();
+
+    // Add 1 data file and 1 equality-delete file
+    sql("INSERT INTO %s SELECT 1, 'hi'", TABLE_NAME_UNPARTITIONED);
+
+    icebergTableUnPartitioned.refresh();
+    CloseableIterable<FileScanTask> tasks = icebergTableUnPartitioned.newScan().planFiles();
+    List<DataFile> dataFiles =
+        Lists.newArrayList(CloseableIterable.transform(tasks, FileScanTask::file));
+    Set<DeleteFile> deleteFiles =
+        Lists.newArrayList(CloseableIterable.transform(tasks, FileScanTask::deletes)).stream()
+            .flatMap(Collection::stream)
+            .collect(Collectors.toSet());
+    Assert.assertEquals("Should have 3 data files before rewrite", 3, dataFiles.size());
+    Assert.assertTrue(
+        "Should have 1 equality-delete file before rewrite",
+        deleteFiles.size() == 1
+            && Iterables.getOnlyElement(deleteFiles).content() == FileContent.EQUALITY_DELETES);
+
+    Assert.assertThrows(
+        "Rewrite using new sequence number should fail",
+        ValidationException.class,
+        () ->
+            Actions.forTable(stale1).rewriteDataFiles().useStartingSequenceNumber(false).execute());
+    // Rewrite using the starting sequence number should succeed
+    RewriteDataFilesActionResult result = Actions.forTable(stale2).rewriteDataFiles().execute();
+
+    // Should not rewrite files from the new commit
+    Assert.assertEquals("Action should rewrite 2 data files", 2, result.deletedDataFiles().size());
+    Assert.assertEquals("Action should add 1 data file", 1, result.addedDataFiles().size());
+
+    icebergTableUnPartitioned.refresh();

Review Comment:
   It doesn't need to.



##########
flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/actions/TestRewriteDataFilesAction.java:
##########
@@ -386,4 +395,73 @@ public void testRewriteAvoidRepeateCompress() throws IOException {
     expected.add(SimpleDataUtil.createRecord(2, "b"));
     SimpleDataUtil.assertTableRecords(icebergTableUnPartitioned, expected);
   }
+
+  @Test
+  public void testRewriteNoConflictWithEqualityDeletes() throws IOException {
+    // Add 2 data files
+    sql("INSERT INTO %s SELECT 1, 'hello'", TABLE_NAME_UNPARTITIONED);
+    sql("INSERT INTO %s SELECT 2, 'world'", TABLE_NAME_UNPARTITIONED);
+
+    // Load 2 stale tables to pass to rewrite actions
+    Table stale1 =
+        validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, TABLE_NAME_UNPARTITIONED));
+    Table stale2 =
+        validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, TABLE_NAME_UNPARTITIONED));
+
+    // Enabled upsert
+    icebergTableUnPartitioned.refresh();
+    upgradeToVersion2((BaseTable) icebergTableUnPartitioned);
+    icebergTableUnPartitioned
+        .updateSchema()
+        .allowIncompatibleChanges()
+        .requireColumn("id")
+        .setIdentifierFields("id")
+        .commit();
+    icebergTableUnPartitioned
+        .updateProperties()
+        .set(TableProperties.UPSERT_ENABLED, "true")
+        .commit();
+
+    // Add 1 data file and 1 equality-delete file
+    sql("INSERT INTO %s SELECT 1, 'hi'", TABLE_NAME_UNPARTITIONED);
+
+    icebergTableUnPartitioned.refresh();
+    CloseableIterable<FileScanTask> tasks = icebergTableUnPartitioned.newScan().planFiles();
+    List<DataFile> dataFiles =
+        Lists.newArrayList(CloseableIterable.transform(tasks, FileScanTask::file));
+    Set<DeleteFile> deleteFiles =
+        Lists.newArrayList(CloseableIterable.transform(tasks, FileScanTask::deletes)).stream()
+            .flatMap(Collection::stream)
+            .collect(Collectors.toSet());
+    Assert.assertEquals("Should have 3 data files before rewrite", 3, dataFiles.size());
+    Assert.assertTrue(
+        "Should have 1 equality-delete file before rewrite",
+        deleteFiles.size() == 1
+            && Iterables.getOnlyElement(deleteFiles).content() == FileContent.EQUALITY_DELETES);

Review Comment:
   Take it apart in two `Assert`



##########
flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/actions/TestRewriteDataFilesAction.java:
##########
@@ -386,4 +395,73 @@ public void testRewriteAvoidRepeateCompress() throws IOException {
     expected.add(SimpleDataUtil.createRecord(2, "b"));
     SimpleDataUtil.assertTableRecords(icebergTableUnPartitioned, expected);
   }
+
+  @Test
+  public void testRewriteNoConflictWithEqualityDeletes() throws IOException {
+    // Add 2 data files
+    sql("INSERT INTO %s SELECT 1, 'hello'", TABLE_NAME_UNPARTITIONED);
+    sql("INSERT INTO %s SELECT 2, 'world'", TABLE_NAME_UNPARTITIONED);
+
+    // Load 2 stale tables to pass to rewrite actions
+    Table stale1 =
+        validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, TABLE_NAME_UNPARTITIONED));
+    Table stale2 =
+        validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, TABLE_NAME_UNPARTITIONED));
+
+    // Enabled upsert
+    icebergTableUnPartitioned.refresh();
+    upgradeToVersion2((BaseTable) icebergTableUnPartitioned);
+    icebergTableUnPartitioned
+        .updateSchema()
+        .allowIncompatibleChanges()
+        .requireColumn("id")
+        .setIdentifierFields("id")
+        .commit();
+    icebergTableUnPartitioned
+        .updateProperties()
+        .set(TableProperties.UPSERT_ENABLED, "true")
+        .commit();
+
+    // Add 1 data file and 1 equality-delete file
+    sql("INSERT INTO %s SELECT 1, 'hi'", TABLE_NAME_UNPARTITIONED);
+
+    icebergTableUnPartitioned.refresh();
+    CloseableIterable<FileScanTask> tasks = icebergTableUnPartitioned.newScan().planFiles();
+    List<DataFile> dataFiles =
+        Lists.newArrayList(CloseableIterable.transform(tasks, FileScanTask::file));
+    Set<DeleteFile> deleteFiles =
+        Lists.newArrayList(CloseableIterable.transform(tasks, FileScanTask::deletes)).stream()
+            .flatMap(Collection::stream)
+            .collect(Collectors.toSet());
+    Assert.assertEquals("Should have 3 data files before rewrite", 3, dataFiles.size());
+    Assert.assertTrue(
+        "Should have 1 equality-delete file before rewrite",
+        deleteFiles.size() == 1
+            && Iterables.getOnlyElement(deleteFiles).content() == FileContent.EQUALITY_DELETES);
+
+    Assert.assertThrows(

Review Comment:
   ` Assertions#assertThatThrownBy` is recommended.
   
   



##########
flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/actions/TestRewriteDataFilesAction.java:
##########
@@ -386,4 +395,73 @@ public void testRewriteAvoidRepeateCompress() throws IOException {
     expected.add(SimpleDataUtil.createRecord(2, "b"));
     SimpleDataUtil.assertTableRecords(icebergTableUnPartitioned, expected);
   }
+
+  @Test
+  public void testRewriteNoConflictWithEqualityDeletes() throws IOException {
+    // Add 2 data files
+    sql("INSERT INTO %s SELECT 1, 'hello'", TABLE_NAME_UNPARTITIONED);
+    sql("INSERT INTO %s SELECT 2, 'world'", TABLE_NAME_UNPARTITIONED);
+
+    // Load 2 stale tables to pass to rewrite actions
+    Table stale1 =
+        validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, TABLE_NAME_UNPARTITIONED));
+    Table stale2 =
+        validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, TABLE_NAME_UNPARTITIONED));
+
+    // Enabled upsert
+    icebergTableUnPartitioned.refresh();
+    upgradeToVersion2((BaseTable) icebergTableUnPartitioned);
+    icebergTableUnPartitioned
+        .updateSchema()
+        .allowIncompatibleChanges()
+        .requireColumn("id")
+        .setIdentifierFields("id")
+        .commit();

Review Comment:
   Create a table with primary keys in before().



##########
flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/actions/TestRewriteDataFilesAction.java:
##########
@@ -386,4 +395,73 @@ public void testRewriteAvoidRepeateCompress() throws IOException {
     expected.add(SimpleDataUtil.createRecord(2, "b"));
     SimpleDataUtil.assertTableRecords(icebergTableUnPartitioned, expected);
   }
+
+  @Test
+  public void testRewriteNoConflictWithEqualityDeletes() throws IOException {
+    // Add 2 data files
+    sql("INSERT INTO %s SELECT 1, 'hello'", TABLE_NAME_UNPARTITIONED);
+    sql("INSERT INTO %s SELECT 2, 'world'", TABLE_NAME_UNPARTITIONED);
+
+    // Load 2 stale tables to pass to rewrite actions
+    Table stale1 =
+        validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, TABLE_NAME_UNPARTITIONED));
+    Table stale2 =
+        validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, TABLE_NAME_UNPARTITIONED));
+
+    // Enabled upsert
+    icebergTableUnPartitioned.refresh();
+    upgradeToVersion2((BaseTable) icebergTableUnPartitioned);
+    icebergTableUnPartitioned
+        .updateSchema()
+        .allowIncompatibleChanges()
+        .requireColumn("id")
+        .setIdentifierFields("id")
+        .commit();
+    icebergTableUnPartitioned
+        .updateProperties()
+        .set(TableProperties.UPSERT_ENABLED, "true")
+        .commit();
+
+    // Add 1 data file and 1 equality-delete file
+    sql("INSERT INTO %s SELECT 1, 'hi'", TABLE_NAME_UNPARTITIONED);
+
+    icebergTableUnPartitioned.refresh();

Review Comment:
   It doesn't need to.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] linyanghao commented on a diff in pull request #7218: Flink: Use starting sequence number by default when rewriting data files

Posted by "linyanghao (via GitHub)" <gi...@apache.org>.
linyanghao commented on code in PR #7218:
URL: https://github.com/apache/iceberg/pull/7218#discussion_r1153987639


##########
core/src/main/java/org/apache/iceberg/actions/BaseRewriteDataFilesAction.java:
##########
@@ -307,11 +314,14 @@ void doReplace(
       Iterable<DataFile> deletedDataFiles,
       Iterable<DataFile> addedDataFiles,
       long startingSnapshotId) {
-    RewriteFiles rewriteFiles =
-        table
-            .newRewrite()
-            .validateFromSnapshot(startingSnapshotId)
-            .rewriteFiles(Sets.newHashSet(deletedDataFiles), Sets.newHashSet(addedDataFiles));
+    RewriteFiles rewriteFiles = table.newRewrite().validateFromSnapshot(startingSnapshotId);
+    if (useStartingSequenceNumber) {
+      long sequenceNumber = table.snapshot(startingSnapshotId).sequenceNumber();
+      rewriteFiles.rewriteFiles(
+          Sets.newHashSet(deletedDataFiles), Sets.newHashSet(addedDataFiles), sequenceNumber);
+    } else {
+      rewriteFiles.rewriteFiles(Sets.newHashSet(deletedDataFiles), Sets.newHashSet(addedDataFiles));
+    }

Review Comment:
   > insert new empty line after '}'
   
   Added. Thanks for reviewing!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a diff in pull request #7218: Flink: Use starting sequence number by default when rewriting data files

Posted by "stevenzwu (via GitHub)" <gi...@apache.org>.
stevenzwu commented on code in PR #7218:
URL: https://github.com/apache/iceberg/pull/7218#discussion_r1167033674


##########
core/src/main/java/org/apache/iceberg/actions/BaseRewriteDataFilesAction.java:
##########
@@ -198,6 +200,21 @@ public BaseRewriteDataFilesAction<ThisT> filter(Expression expr) {
     return this;
   }
 
+  /**
+   * If the compaction should use the sequence number of the snapshot at compaction start time for
+   * new data files, instead of using the sequence number of the newly produced snapshot.
+   *
+   * <p>This avoids commit conflicts with updates that add newer equality deletes at a higher
+   * sequence number.
+   *
+   * @param useStarting use starting sequence number if set to true
+   * @return this for method chaining
+   */
+  public BaseRewriteDataFilesAction<ThisT> useStartingSequenceNumber(boolean useStarting) {

Review Comment:
   it seems that `BaseRewriteDataFilesAction` is only extended by Flink actions. I am fine with adding this API in the core module as a quick fix for now. agree that long-term plan should be on the new rewrite interfaces. But for backward compatibility, should the default be false? and Flink rewrite action can pass in `true`.



##########
flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/actions/TestRewriteDataFilesAction.java:
##########
@@ -386,4 +402,56 @@ public void testRewriteAvoidRepeateCompress() throws IOException {
     expected.add(SimpleDataUtil.createRecord(2, "b"));
     SimpleDataUtil.assertTableRecords(icebergTableUnPartitioned, expected);
   }
+
+  @Test
+  public void testRewriteNoConflictWithEqualityDeletes() throws IOException {
+    // Add 2 data files
+    sql("INSERT INTO %s SELECT 1, 'hello'", TABLE_NAME_WITH_PK);
+    sql("INSERT INTO %s SELECT 2, 'world'", TABLE_NAME_WITH_PK);
+
+    // Load 2 stale tables to pass to rewrite actions
+    Table stale1 =

Review Comment:
   aren't stale1 and stale2 referencing the same snapshot in this case? do you mean't to do the load table after each insert statement above?



##########
flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/actions/TestRewriteDataFilesAction.java:
##########
@@ -386,4 +402,56 @@ public void testRewriteAvoidRepeateCompress() throws IOException {
     expected.add(SimpleDataUtil.createRecord(2, "b"));
     SimpleDataUtil.assertTableRecords(icebergTableUnPartitioned, expected);
   }
+
+  @Test
+  public void testRewriteNoConflictWithEqualityDeletes() throws IOException {
+    // Add 2 data files
+    sql("INSERT INTO %s SELECT 1, 'hello'", TABLE_NAME_WITH_PK);
+    sql("INSERT INTO %s SELECT 2, 'world'", TABLE_NAME_WITH_PK);
+
+    // Load 2 stale tables to pass to rewrite actions
+    Table stale1 =
+        validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, TABLE_NAME_WITH_PK));
+    Table stale2 =
+        validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, TABLE_NAME_WITH_PK));
+
+    // Add 1 data file and 1 equality-delete file
+    sql("INSERT INTO %s /*+ OPTIONS('upsert-enabled'='true')*/ SELECT 1, 'hi'", TABLE_NAME_WITH_PK);
+
+    icebergTableWithPk.refresh();
+    CloseableIterable<FileScanTask> tasks = icebergTableWithPk.newScan().planFiles();
+    List<DataFile> dataFiles =
+        Lists.newArrayList(CloseableIterable.transform(tasks, FileScanTask::file));
+    Set<DeleteFile> deleteFiles =
+        Lists.newArrayList(CloseableIterable.transform(tasks, FileScanTask::deletes)).stream()
+            .flatMap(Collection::stream)
+            .collect(Collectors.toSet());
+    Assert.assertEquals("Should have 3 data files before rewrite", 3, dataFiles.size());
+    Assert.assertEquals("Should have 1 delete file before rewrite", 1, deleteFiles.size());
+    Assert.assertSame(
+        "The 1 delete file should be an equality-delete file",
+        Iterables.getOnlyElement(deleteFiles).content(),
+        FileContent.EQUALITY_DELETES);
+
+    Assertions.assertThatThrownBy(
+            () ->
+                Actions.forTable(stale1)
+                    .rewriteDataFiles()
+                    .useStartingSequenceNumber(false)
+                    .execute(),
+            "Rewrite using new sequence number should fail")
+        .isInstanceOf(ValidationException.class);
+    // Rewrite using the starting sequence number should succeed

Review Comment:
   nit: add an new empty line to easily see there are two separate tests.



##########
flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/actions/TestRewriteDataFilesAction.java:
##########
@@ -386,4 +402,56 @@ public void testRewriteAvoidRepeateCompress() throws IOException {
     expected.add(SimpleDataUtil.createRecord(2, "b"));
     SimpleDataUtil.assertTableRecords(icebergTableUnPartitioned, expected);
   }
+
+  @Test
+  public void testRewriteNoConflictWithEqualityDeletes() throws IOException {
+    // Add 2 data files
+    sql("INSERT INTO %s SELECT 1, 'hello'", TABLE_NAME_WITH_PK);
+    sql("INSERT INTO %s SELECT 2, 'world'", TABLE_NAME_WITH_PK);
+
+    // Load 2 stale tables to pass to rewrite actions
+    Table stale1 =
+        validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, TABLE_NAME_WITH_PK));
+    Table stale2 =
+        validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, TABLE_NAME_WITH_PK));
+
+    // Add 1 data file and 1 equality-delete file
+    sql("INSERT INTO %s /*+ OPTIONS('upsert-enabled'='true')*/ SELECT 1, 'hi'", TABLE_NAME_WITH_PK);
+
+    icebergTableWithPk.refresh();
+    CloseableIterable<FileScanTask> tasks = icebergTableWithPk.newScan().planFiles();
+    List<DataFile> dataFiles =
+        Lists.newArrayList(CloseableIterable.transform(tasks, FileScanTask::file));
+    Set<DeleteFile> deleteFiles =
+        Lists.newArrayList(CloseableIterable.transform(tasks, FileScanTask::deletes)).stream()
+            .flatMap(Collection::stream)
+            .collect(Collectors.toSet());
+    Assert.assertEquals("Should have 3 data files before rewrite", 3, dataFiles.size());
+    Assert.assertEquals("Should have 1 delete file before rewrite", 1, deleteFiles.size());
+    Assert.assertSame(
+        "The 1 delete file should be an equality-delete file",
+        Iterables.getOnlyElement(deleteFiles).content(),
+        FileContent.EQUALITY_DELETES);

Review Comment:
   should we also add assertions on the file sequence numbers here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] linyanghao commented on pull request #7218: Flink: Use starting sequence number by default when rewriting data files

Posted by "linyanghao (via GitHub)" <gi...@apache.org>.
linyanghao commented on PR #7218:
URL: https://github.com/apache/iceberg/pull/7218#issuecomment-1518498061

   > @linyanghao please create the backport PRs for Flink 1.15 and 1.17. please check the diff for the backport PR. E.g.
   > 
   > ```
   >  git diff --no-index  flink/v1.16/flink/src/ flink/v1.17/flink/src
   > ```
   
   Sure! Will do the backport soon.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] hililiwei commented on a diff in pull request #7218: Flink: Use starting sequence number by default when rewriting data files

Posted by "hililiwei (via GitHub)" <gi...@apache.org>.
hililiwei commented on code in PR #7218:
URL: https://github.com/apache/iceberg/pull/7218#discussion_r1154082763


##########
core/src/main/java/org/apache/iceberg/actions/BaseRewriteDataFilesAction.java:
##########
@@ -198,6 +200,11 @@ public BaseRewriteDataFilesAction<ThisT> filter(Expression expr) {
     return this;
   }
 
+  public BaseRewriteDataFilesAction<ThisT> useStartingSequenceNumber(boolean useStarting) {

Review Comment:
   nit: 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] linyanghao commented on pull request #7218: Flink: Use starting sequence number by default when rewriting data files

Posted by "linyanghao (via GitHub)" <gi...@apache.org>.
linyanghao commented on PR #7218:
URL: https://github.com/apache/iceberg/pull/7218#issuecomment-1495683750

   @hililiwei Thanks for reviewing. Made changes based on your reviews.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] linyanghao commented on a diff in pull request #7218: Flink: Use starting sequence number by default when rewriting data files

Posted by "linyanghao (via GitHub)" <gi...@apache.org>.
linyanghao commented on code in PR #7218:
URL: https://github.com/apache/iceberg/pull/7218#discussion_r1171702745


##########
flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/actions/TestRewriteDataFilesAction.java:
##########
@@ -386,4 +402,63 @@ public void testRewriteAvoidRepeateCompress() throws IOException {
     expected.add(SimpleDataUtil.createRecord(2, "b"));
     SimpleDataUtil.assertTableRecords(icebergTableUnPartitioned, expected);
   }
+
+  @Test
+  public void testRewriteNoConflictWithEqualityDeletes() throws IOException {
+    // Add 2 data files
+    sql("INSERT INTO %s SELECT 1, 'hello'", TABLE_NAME_WITH_PK);
+    sql("INSERT INTO %s SELECT 2, 'world'", TABLE_NAME_WITH_PK);
+
+    // Load 2 stale tables to pass to rewrite actions
+    // Since the first rewrite will refresh stale1, we need another stale2 for the second rewrite
+    Table stale1 =
+        validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, TABLE_NAME_WITH_PK));
+    Table stale2 =
+        validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, TABLE_NAME_WITH_PK));
+
+    // Add 1 data file and 1 equality-delete file
+    sql("INSERT INTO %s /*+ OPTIONS('upsert-enabled'='true')*/ SELECT 1, 'hi'", TABLE_NAME_WITH_PK);
+
+    icebergTableWithPk.refresh();
+    CloseableIterable<FileScanTask> tasks = icebergTableWithPk.newScan().planFiles();
+    List<DataFile> dataFiles =
+        Lists.newArrayList(CloseableIterable.transform(tasks, FileScanTask::file));
+    Set<DeleteFile> deleteFiles =
+        Lists.newArrayList(CloseableIterable.transform(tasks, FileScanTask::deletes)).stream()
+            .flatMap(Collection::stream)
+            .collect(Collectors.toSet());
+    Assert.assertEquals("Should have 3 data files before rewrite", 3, dataFiles.size());
+    Assert.assertEquals("Should have 1 delete file before rewrite", 1, deleteFiles.size());
+    Assert.assertSame(
+        "The 1 delete file should be an equality-delete file",
+        Iterables.getOnlyElement(deleteFiles).content(),
+        FileContent.EQUALITY_DELETES);
+    Assert.assertEquals(
+        "The latest sequence number should be greater than that of the stale snapshot",
+        stale1.currentSnapshot().sequenceNumber() + 1,
+        icebergTableWithPk.currentSnapshot().sequenceNumber());
+
+    Assertions.assertThatThrownBy(
+            () ->
+                Actions.forTable(stale1)
+                    .rewriteDataFiles()
+                    .useStartingSequenceNumber(false)
+                    .execute(),
+            "Rewrite using new sequence number should fail")
+        .isInstanceOf(ValidationException.class);
+
+    // Rewrite using the starting sequence number should succeed
+    RewriteDataFilesActionResult result =
+        Actions.forTable(stale2).rewriteDataFiles().useStartingSequenceNumber(true).execute();
+
+    // Should not rewrite files from the new commit
+    Assert.assertEquals("Action should rewrite 2 data files", 2, result.deletedDataFiles().size());
+    Assert.assertEquals("Action should add 1 data file", 1, result.addedDataFiles().size());

Review Comment:
   Added.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] hililiwei commented on a diff in pull request #7218: Flink: Use starting sequence number by default when rewriting data files

Posted by "hililiwei (via GitHub)" <gi...@apache.org>.
hililiwei commented on code in PR #7218:
URL: https://github.com/apache/iceberg/pull/7218#discussion_r1153005364


##########
core/src/main/java/org/apache/iceberg/actions/BaseRewriteDataFilesAction.java:
##########
@@ -307,11 +314,14 @@ void doReplace(
       Iterable<DataFile> deletedDataFiles,
       Iterable<DataFile> addedDataFiles,
       long startingSnapshotId) {
-    RewriteFiles rewriteFiles =
-        table
-            .newRewrite()
-            .validateFromSnapshot(startingSnapshotId)
-            .rewriteFiles(Sets.newHashSet(deletedDataFiles), Sets.newHashSet(addedDataFiles));
+    RewriteFiles rewriteFiles = table.newRewrite().validateFromSnapshot(startingSnapshotId);
+    if (useStartingSequenceNumber) {
+      long sequenceNumber = table.snapshot(startingSnapshotId).sequenceNumber();
+      rewriteFiles.rewriteFiles(
+          Sets.newHashSet(deletedDataFiles), Sets.newHashSet(addedDataFiles), sequenceNumber);
+    } else {
+      rewriteFiles.rewriteFiles(Sets.newHashSet(deletedDataFiles), Sets.newHashSet(addedDataFiles));
+    }

Review Comment:
   insert new empty line after '}'



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] linyanghao commented on a diff in pull request #7218: Flink: Use starting sequence number by default when rewriting data files

Posted by "linyanghao (via GitHub)" <gi...@apache.org>.
linyanghao commented on code in PR #7218:
URL: https://github.com/apache/iceberg/pull/7218#discussion_r1161569348


##########
flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/actions/TestRewriteDataFilesAction.java:
##########
@@ -20,9 +20,12 @@
 
 import static org.apache.iceberg.flink.SimpleDataUtil.RECORD;
 
+import com.google.common.collect.Iterables;

Review Comment:
   Updated. Thanks for reviewing!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on pull request #7218: Flink: Use starting sequence number by default when rewriting data files

Posted by "stevenzwu (via GitHub)" <gi...@apache.org>.
stevenzwu commented on PR #7218:
URL: https://github.com/apache/iceberg/pull/7218#issuecomment-1518120856

   thanks @linyanghao for the contribution and @hililiwei and @chenjunjiedada for the reviews.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] linyanghao commented on a diff in pull request #7218: Flink: Use starting sequence number by default when rewriting data files

Posted by "linyanghao (via GitHub)" <gi...@apache.org>.
linyanghao commented on code in PR #7218:
URL: https://github.com/apache/iceberg/pull/7218#discussion_r1169106038


##########
flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/actions/TestRewriteDataFilesAction.java:
##########
@@ -386,4 +402,56 @@ public void testRewriteAvoidRepeateCompress() throws IOException {
     expected.add(SimpleDataUtil.createRecord(2, "b"));
     SimpleDataUtil.assertTableRecords(icebergTableUnPartitioned, expected);
   }
+
+  @Test
+  public void testRewriteNoConflictWithEqualityDeletes() throws IOException {
+    // Add 2 data files
+    sql("INSERT INTO %s SELECT 1, 'hello'", TABLE_NAME_WITH_PK);
+    sql("INSERT INTO %s SELECT 2, 'world'", TABLE_NAME_WITH_PK);
+
+    // Load 2 stale tables to pass to rewrite actions
+    Table stale1 =

Review Comment:
   They do reference the same snapshot. The test does 2 rewrites, 1 without starting seq number and 1 with starting seq number. The first rewrite will refresh the table when trying to commit. Making stale1 not stale anymore. So we need another stale2 for the second rewrite.
   I'm adding some comments to make this clear.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] linyanghao commented on pull request #7218: Flink: Use starting sequence number by default when rewriting data files

Posted by "linyanghao (via GitHub)" <gi...@apache.org>.
linyanghao commented on PR #7218:
URL: https://github.com/apache/iceberg/pull/7218#issuecomment-1495593731

   > Left a few comments. At the same time, I found that we already have another interface: `org.apache.iceberg.actions.RewriteDataFiles`, Spark in the use of it. Can Flink use this class the same way Spark does?
   
   To use the new interface, quite some refactoring is needed. I guess we should do it in a separate PR later?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a diff in pull request #7218: Flink: Use starting sequence number by default when rewriting data files

Posted by "stevenzwu (via GitHub)" <gi...@apache.org>.
stevenzwu commented on code in PR #7218:
URL: https://github.com/apache/iceberg/pull/7218#discussion_r1169291646


##########
flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/actions/TestRewriteDataFilesAction.java:
##########
@@ -386,4 +402,63 @@ public void testRewriteAvoidRepeateCompress() throws IOException {
     expected.add(SimpleDataUtil.createRecord(2, "b"));
     SimpleDataUtil.assertTableRecords(icebergTableUnPartitioned, expected);
   }
+
+  @Test
+  public void testRewriteNoConflictWithEqualityDeletes() throws IOException {
+    // Add 2 data files
+    sql("INSERT INTO %s SELECT 1, 'hello'", TABLE_NAME_WITH_PK);
+    sql("INSERT INTO %s SELECT 2, 'world'", TABLE_NAME_WITH_PK);
+
+    // Load 2 stale tables to pass to rewrite actions
+    // Since the first rewrite will refresh stale1, we need another stale2 for the second rewrite
+    Table stale1 =
+        validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, TABLE_NAME_WITH_PK));
+    Table stale2 =
+        validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, TABLE_NAME_WITH_PK));
+
+    // Add 1 data file and 1 equality-delete file
+    sql("INSERT INTO %s /*+ OPTIONS('upsert-enabled'='true')*/ SELECT 1, 'hi'", TABLE_NAME_WITH_PK);
+
+    icebergTableWithPk.refresh();
+    CloseableIterable<FileScanTask> tasks = icebergTableWithPk.newScan().planFiles();
+    List<DataFile> dataFiles =
+        Lists.newArrayList(CloseableIterable.transform(tasks, FileScanTask::file));
+    Set<DeleteFile> deleteFiles =
+        Lists.newArrayList(CloseableIterable.transform(tasks, FileScanTask::deletes)).stream()
+            .flatMap(Collection::stream)
+            .collect(Collectors.toSet());
+    Assert.assertEquals("Should have 3 data files before rewrite", 3, dataFiles.size());
+    Assert.assertEquals("Should have 1 delete file before rewrite", 1, deleteFiles.size());
+    Assert.assertSame(
+        "The 1 delete file should be an equality-delete file",
+        Iterables.getOnlyElement(deleteFiles).content(),
+        FileContent.EQUALITY_DELETES);
+    Assert.assertEquals(

Review Comment:
   nit: might be a little easier to read to move this assertion immediately after the `refresh` on line 422.
   
   Then start a new empty line for the more in-depth assertions



##########
flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/actions/TestRewriteDataFilesAction.java:
##########
@@ -386,4 +402,63 @@ public void testRewriteAvoidRepeateCompress() throws IOException {
     expected.add(SimpleDataUtil.createRecord(2, "b"));
     SimpleDataUtil.assertTableRecords(icebergTableUnPartitioned, expected);
   }
+
+  @Test
+  public void testRewriteNoConflictWithEqualityDeletes() throws IOException {
+    // Add 2 data files
+    sql("INSERT INTO %s SELECT 1, 'hello'", TABLE_NAME_WITH_PK);
+    sql("INSERT INTO %s SELECT 2, 'world'", TABLE_NAME_WITH_PK);
+
+    // Load 2 stale tables to pass to rewrite actions
+    // Since the first rewrite will refresh stale1, we need another stale2 for the second rewrite
+    Table stale1 =
+        validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, TABLE_NAME_WITH_PK));
+    Table stale2 =
+        validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, TABLE_NAME_WITH_PK));
+
+    // Add 1 data file and 1 equality-delete file
+    sql("INSERT INTO %s /*+ OPTIONS('upsert-enabled'='true')*/ SELECT 1, 'hi'", TABLE_NAME_WITH_PK);
+
+    icebergTableWithPk.refresh();
+    CloseableIterable<FileScanTask> tasks = icebergTableWithPk.newScan().planFiles();
+    List<DataFile> dataFiles =
+        Lists.newArrayList(CloseableIterable.transform(tasks, FileScanTask::file));
+    Set<DeleteFile> deleteFiles =
+        Lists.newArrayList(CloseableIterable.transform(tasks, FileScanTask::deletes)).stream()
+            .flatMap(Collection::stream)
+            .collect(Collectors.toSet());
+    Assert.assertEquals("Should have 3 data files before rewrite", 3, dataFiles.size());
+    Assert.assertEquals("Should have 1 delete file before rewrite", 1, deleteFiles.size());
+    Assert.assertSame(
+        "The 1 delete file should be an equality-delete file",
+        Iterables.getOnlyElement(deleteFiles).content(),
+        FileContent.EQUALITY_DELETES);
+    Assert.assertEquals(
+        "The latest sequence number should be greater than that of the stale snapshot",
+        stale1.currentSnapshot().sequenceNumber() + 1,
+        icebergTableWithPk.currentSnapshot().sequenceNumber());
+
+    Assertions.assertThatThrownBy(
+            () ->
+                Actions.forTable(stale1)
+                    .rewriteDataFiles()
+                    .useStartingSequenceNumber(false)
+                    .execute(),
+            "Rewrite using new sequence number should fail")
+        .isInstanceOf(ValidationException.class);
+
+    // Rewrite using the starting sequence number should succeed
+    RewriteDataFilesActionResult result =
+        Actions.forTable(stale2).rewriteDataFiles().useStartingSequenceNumber(true).execute();
+
+    // Should not rewrite files from the new commit
+    Assert.assertEquals("Action should rewrite 2 data files", 2, result.deletedDataFiles().size());
+    Assert.assertEquals("Action should add 1 data file", 1, result.addedDataFiles().size());

Review Comment:
   also verify the sequence number of the remaining and new data files?



##########
flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/actions/TestRewriteDataFilesAction.java:
##########
@@ -386,4 +402,56 @@ public void testRewriteAvoidRepeateCompress() throws IOException {
     expected.add(SimpleDataUtil.createRecord(2, "b"));
     SimpleDataUtil.assertTableRecords(icebergTableUnPartitioned, expected);
   }
+
+  @Test
+  public void testRewriteNoConflictWithEqualityDeletes() throws IOException {
+    // Add 2 data files
+    sql("INSERT INTO %s SELECT 1, 'hello'", TABLE_NAME_WITH_PK);
+    sql("INSERT INTO %s SELECT 2, 'world'", TABLE_NAME_WITH_PK);
+
+    // Load 2 stale tables to pass to rewrite actions
+    Table stale1 =
+        validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, TABLE_NAME_WITH_PK));
+    Table stale2 =
+        validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, TABLE_NAME_WITH_PK));
+
+    // Add 1 data file and 1 equality-delete file
+    sql("INSERT INTO %s /*+ OPTIONS('upsert-enabled'='true')*/ SELECT 1, 'hi'", TABLE_NAME_WITH_PK);
+
+    icebergTableWithPk.refresh();
+    CloseableIterable<FileScanTask> tasks = icebergTableWithPk.newScan().planFiles();
+    List<DataFile> dataFiles =
+        Lists.newArrayList(CloseableIterable.transform(tasks, FileScanTask::file));
+    Set<DeleteFile> deleteFiles =
+        Lists.newArrayList(CloseableIterable.transform(tasks, FileScanTask::deletes)).stream()
+            .flatMap(Collection::stream)
+            .collect(Collectors.toSet());
+    Assert.assertEquals("Should have 3 data files before rewrite", 3, dataFiles.size());
+    Assert.assertEquals("Should have 1 delete file before rewrite", 1, deleteFiles.size());
+    Assert.assertSame(
+        "The 1 delete file should be an equality-delete file",
+        Iterables.getOnlyElement(deleteFiles).content(),
+        FileContent.EQUALITY_DELETES);

Review Comment:
   I mean assertions on the file sequence numbers before rewrite



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org