You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2021/03/18 02:50:35 UTC

[GitHub] [iceberg] yyanyy commented on a change in pull request #2294: Core: Support rewriting delete files.

yyanyy commented on a change in pull request #2294:
URL: https://github.com/apache/iceberg/pull/2294#discussion_r596509652



##########
File path: core/src/test/java/org/apache/iceberg/TestRewriteFiles.java
##########
@@ -228,6 +383,163 @@ public void testRecovery() {
     Assert.assertEquals("Only 3 manifests should exist", 3, listManifestFiles().size());
   }
 
+  @Test
+  public void testRecoverWhenRewriteBothDataAndDeleteFiles() {
+    Assume.assumeTrue("Rewriting delete files is only supported in iceberg format v2. ", formatVersion > 1);
+
+    table.newRowDelta()
+        .addRows(FILE_A)
+        .addRows(FILE_B)
+        .addRows(FILE_C)
+        .addDeletes(FILE_A_DELETES)
+        .addDeletes(FILE_B_DELETES)
+        .commit();
+
+    long baseSnapshotId = readMetadata().currentSnapshot().snapshotId();
+    table.ops().failCommits(3);
+
+    RewriteFiles rewrite = table.newRewrite()
+        .rewriteFiles(ImmutableSet.of(FILE_A), ImmutableSet.of(FILE_A_DELETES, FILE_B_DELETES),
+            ImmutableSet.of(FILE_D), ImmutableSet.of());
+    Snapshot pending = rewrite.apply();
+
+    Assert.assertEquals("Should produce 3 manifests", 3, pending.allManifests().size());
+    ManifestFile manifest1 = pending.allManifests().get(0);
+    ManifestFile manifest2 = pending.allManifests().get(1);
+    ManifestFile manifest3 = pending.allManifests().get(2);
+
+    validateManifestEntries(manifest1,
+        ids(pending.snapshotId()),
+        files(FILE_D),
+        statuses(ADDED));
+
+    validateManifestEntries(manifest2,
+        ids(pending.snapshotId(), baseSnapshotId, baseSnapshotId),
+        files(FILE_A, FILE_B, FILE_C),
+        statuses(DELETED, EXISTING, EXISTING));
+
+    validateDeleteManifest(manifest3,
+        seqs(2, 2),
+        ids(pending.snapshotId(), pending.snapshotId()),
+        files(FILE_A_DELETES, FILE_B_DELETES),
+        statuses(DELETED, DELETED));
+
+    rewrite.commit();
+
+    Assert.assertTrue("Should reuse new manifest", new File(manifest1.path()).exists());
+    Assert.assertTrue("Should reuse new manifest", new File(manifest2.path()).exists());
+    Assert.assertTrue("Should reuse new manifest", new File(manifest3.path()).exists());
+
+    TableMetadata metadata = readMetadata();
+    List<ManifestFile> committedManifests = Lists.newArrayList(manifest1, manifest2, manifest3);
+    Assert.assertTrue("Should committed the manifests",
+        metadata.currentSnapshot().allManifests().containsAll(committedManifests));

Review comment:
       nit: could do `metadata.currentSnapshot().allManifests().equals(committedManifests)`

##########
File path: core/src/test/java/org/apache/iceberg/TestRewriteFiles.java
##########
@@ -228,6 +383,163 @@ public void testRecovery() {
     Assert.assertEquals("Only 3 manifests should exist", 3, listManifestFiles().size());
   }
 
+  @Test
+  public void testRecoverWhenRewriteBothDataAndDeleteFiles() {
+    Assume.assumeTrue("Rewriting delete files is only supported in iceberg format v2. ", formatVersion > 1);
+
+    table.newRowDelta()
+        .addRows(FILE_A)
+        .addRows(FILE_B)
+        .addRows(FILE_C)
+        .addDeletes(FILE_A_DELETES)
+        .addDeletes(FILE_B_DELETES)
+        .commit();
+
+    long baseSnapshotId = readMetadata().currentSnapshot().snapshotId();
+    table.ops().failCommits(3);
+
+    RewriteFiles rewrite = table.newRewrite()
+        .rewriteFiles(ImmutableSet.of(FILE_A), ImmutableSet.of(FILE_A_DELETES, FILE_B_DELETES),
+            ImmutableSet.of(FILE_D), ImmutableSet.of());
+    Snapshot pending = rewrite.apply();
+
+    Assert.assertEquals("Should produce 3 manifests", 3, pending.allManifests().size());
+    ManifestFile manifest1 = pending.allManifests().get(0);
+    ManifestFile manifest2 = pending.allManifests().get(1);
+    ManifestFile manifest3 = pending.allManifests().get(2);
+
+    validateManifestEntries(manifest1,
+        ids(pending.snapshotId()),
+        files(FILE_D),
+        statuses(ADDED));
+
+    validateManifestEntries(manifest2,
+        ids(pending.snapshotId(), baseSnapshotId, baseSnapshotId),
+        files(FILE_A, FILE_B, FILE_C),
+        statuses(DELETED, EXISTING, EXISTING));
+
+    validateDeleteManifest(manifest3,
+        seqs(2, 2),
+        ids(pending.snapshotId(), pending.snapshotId()),
+        files(FILE_A_DELETES, FILE_B_DELETES),
+        statuses(DELETED, DELETED));
+
+    rewrite.commit();
+
+    Assert.assertTrue("Should reuse new manifest", new File(manifest1.path()).exists());
+    Assert.assertTrue("Should reuse new manifest", new File(manifest2.path()).exists());
+    Assert.assertTrue("Should reuse new manifest", new File(manifest3.path()).exists());
+
+    TableMetadata metadata = readMetadata();
+    List<ManifestFile> committedManifests = Lists.newArrayList(manifest1, manifest2, manifest3);
+    Assert.assertTrue("Should committed the manifests",
+        metadata.currentSnapshot().allManifests().containsAll(committedManifests));
+
+    // As commit success all the manifests added with rewrite should be available.
+    Assert.assertEquals("Only 5 manifest should exist", 5, listManifestFiles().size());
+  }
+
+  @Test
+  public void testReplaceEqualityDeletesWithPositionDeletes() {
+    Assume.assumeTrue("Rewriting delete files is only supported in iceberg format v2. ", formatVersion > 1);
+
+    table.newRowDelta()
+        .addRows(FILE_A2)
+        .addDeletes(FILE_A2_DELETES)
+        .commit();
+
+    TableMetadata metadata = readMetadata();
+    long baseSnapshotId = metadata.currentSnapshot().snapshotId();
+
+    // Apply and commit the rewrite transaction.
+    RewriteFiles rewrite = table.newRewrite().rewriteFiles(
+        ImmutableSet.of(), ImmutableSet.of(FILE_A2_DELETES),
+        ImmutableSet.of(), ImmutableSet.of(FILE_B_DELETES)
+    );
+    Snapshot pending = rewrite.apply();
+
+    Assert.assertEquals("Should produce 3 manifests", 3, pending.allManifests().size());
+    ManifestFile manifest1 = pending.allManifests().get(0);
+    ManifestFile manifest2 = pending.allManifests().get(1);
+    ManifestFile manifest3 = pending.allManifests().get(2);
+
+    validateManifestEntries(manifest1,
+        ids(baseSnapshotId),
+        files(FILE_A2),
+        statuses(ADDED));
+
+    validateDeleteManifest(manifest2,
+        seqs(2),
+        ids(pending.snapshotId()),
+        files(FILE_B_DELETES),
+        statuses(ADDED));
+
+    validateDeleteManifest(manifest3,
+        seqs(2),
+        ids(pending.snapshotId()),
+        files(FILE_A2_DELETES),
+        statuses(DELETED));
+
+    rewrite.commit();
+
+    Assert.assertTrue("Should reuse new manifest", new File(manifest1.path()).exists());
+    Assert.assertTrue("Should reuse new manifest", new File(manifest2.path()).exists());
+    Assert.assertTrue("Should reuse new manifest", new File(manifest3.path()).exists());
+
+    metadata = readMetadata();
+    List<ManifestFile> committedManifests = Lists.newArrayList(manifest1, manifest2, manifest3);
+    Assert.assertTrue("Should committed the manifests",
+        metadata.currentSnapshot().allManifests().containsAll(committedManifests));

Review comment:
       nit: also `equals` here

##########
File path: api/src/main/java/org/apache/iceberg/RewriteFiles.java
##########
@@ -35,11 +36,30 @@
  */
 public interface RewriteFiles extends SnapshotUpdate<RewriteFiles> {
   /**
-   * Add a rewrite that replaces one set of files with another set that contains the same data.
+   * Add a rewrite that replaces one set of data files with another set that contains the same data.
    *
    * @param filesToDelete files that will be replaced (deleted), cannot be null or empty.
-   * @param filesToAdd files that will be added, cannot be null or empty.
+   * @param filesToAdd    files that will be added, cannot be null or empty.
    * @return this for method chaining
    */
-  RewriteFiles rewriteFiles(Set<DataFile> filesToDelete, Set<DataFile> filesToAdd);
+  default RewriteFiles rewriteFiles(Set<DataFile> filesToDelete, Set<DataFile> filesToAdd) {
+    return rewriteFiles(
+        filesToDelete,
+        ImmutableSet.of(),
+        filesToAdd,
+        ImmutableSet.of()
+    );
+  }
+
+  /**
+   * Add a rewrite that replaces one set of files with another set that contains the same data (format v2).

Review comment:
       nit: probably don't need to mention format v2 specifically

##########
File path: core/src/test/java/org/apache/iceberg/TestRewriteFiles.java
##########
@@ -164,6 +207,66 @@ public void testAddAndDelete() {
     Assert.assertEquals("Only 3 manifests should exist", 3, listManifestFiles().size());
   }
 
+  @Test
+  public void testRewriteDataAndDeleteFiles() {
+    Assume.assumeTrue("Rewriting delete files is only supported in iceberg format v2. ", formatVersion > 1);
+    Assert.assertEquals("Table should start empty", 0, listManifestFiles().size());
+
+    table.newRowDelta()
+        .addRows(FILE_A)
+        .addRows(FILE_B)
+        .addRows(FILE_C)
+        .addDeletes(FILE_A_DELETES)
+        .addDeletes(FILE_B_DELETES)
+        .commit();
+
+    TableMetadata base = readMetadata();
+    Snapshot baseSnap = base.currentSnapshot();
+    long baseSnapshotId = baseSnap.snapshotId();
+    Assert.assertEquals("Should create 2 manifests for initial write", 2, baseSnap.allManifests().size());
+    List<ManifestFile> initialManifests = baseSnap.allManifests();
+
+    validateManifestEntries(initialManifests.get(0),
+        ids(baseSnapshotId, baseSnapshotId, baseSnapshotId),
+        files(FILE_A, FILE_B, FILE_C),
+        statuses(ADDED, ADDED, ADDED));
+    validateDeleteManifest(initialManifests.get(1),
+        seqs(1, 1),
+        ids(baseSnapshotId, baseSnapshotId),
+        files(FILE_A_DELETES, FILE_B_DELETES),
+        statuses(ADDED, ADDED));
+
+    // Rewrite the files.
+    Snapshot pending = table.newRewrite()
+        .rewriteFiles(ImmutableSet.of(FILE_A), ImmutableSet.of(FILE_A_DELETES),
+            ImmutableSet.of(FILE_D), ImmutableSet.of())
+        .apply();
+
+    Assert.assertEquals("Should contain 3 manifest", 3, pending.allManifests().size());
+    Assert.assertFalse("Should not contain manifest from initial write",
+        pending.allManifests().containsAll(initialManifests));

Review comment:
       I think we want to check if any manifest matches initial manifests by `.allManifests.stream().anyMatch(initialManifests::contains)` ?

##########
File path: core/src/main/java/org/apache/iceberg/BaseRewriteFiles.java
##########
@@ -40,19 +40,55 @@ protected String operation() {
     return DataOperations.REPLACE;
   }
 
+  private void verifyInputAndOutputFiles(Set<DataFile> dataFilesToDelete, Set<DeleteFile> deleteFilesToDelete,
+                                         Set<DataFile> dataFilesToAdd, Set<DeleteFile> deleteFilesToAdd) {
+    int filesToDelete = 0;
+    if (dataFilesToDelete != null) {
+      filesToDelete += dataFilesToDelete.size();
+    }
+
+    if (deleteFilesToDelete != null) {
+      filesToDelete += deleteFilesToDelete.size();
+    }
+
+    Preconditions.checkArgument(filesToDelete > 0, "Files to delete cannot be null or empty");
+
+    if (deleteFilesToDelete == null || deleteFilesToDelete.isEmpty()) {
+      // When there is no delete files in the rewrite action, data files to add cannot be null or empty.
+      Preconditions.checkArgument(dataFilesToAdd != null && dataFilesToAdd.size() > 0,
+          "Data files to add can not be null or empty because there's no delete file to rewrite");
+      Preconditions.checkArgument(deleteFilesToAdd == null || deleteFilesToAdd.isEmpty(),
+          "Delete files to add must be null or empty because there's no delete file to rewrite");
+    }
+  }
+
   @Override
-  public RewriteFiles rewriteFiles(Set<DataFile> filesToDelete, Set<DataFile> filesToAdd) {
-    Preconditions.checkArgument(filesToDelete != null && !filesToDelete.isEmpty(),
-        "Files to delete cannot be null or empty");
-    Preconditions.checkArgument(filesToAdd != null && !filesToAdd.isEmpty(),

Review comment:
       Looks like we are changing the logic now to not enforce input sets to be non-nullable? I think for the new code we can do a precondition check on the four input sets to ensure they are all non-null, to save all the null check everywhere

##########
File path: core/src/test/java/org/apache/iceberg/TestRewriteFiles.java
##########
@@ -228,6 +383,163 @@ public void testRecovery() {
     Assert.assertEquals("Only 3 manifests should exist", 3, listManifestFiles().size());
   }
 
+  @Test
+  public void testRecoverWhenRewriteBothDataAndDeleteFiles() {
+    Assume.assumeTrue("Rewriting delete files is only supported in iceberg format v2. ", formatVersion > 1);
+
+    table.newRowDelta()
+        .addRows(FILE_A)
+        .addRows(FILE_B)
+        .addRows(FILE_C)
+        .addDeletes(FILE_A_DELETES)
+        .addDeletes(FILE_B_DELETES)
+        .commit();
+
+    long baseSnapshotId = readMetadata().currentSnapshot().snapshotId();
+    table.ops().failCommits(3);
+
+    RewriteFiles rewrite = table.newRewrite()
+        .rewriteFiles(ImmutableSet.of(FILE_A), ImmutableSet.of(FILE_A_DELETES, FILE_B_DELETES),
+            ImmutableSet.of(FILE_D), ImmutableSet.of());
+    Snapshot pending = rewrite.apply();
+
+    Assert.assertEquals("Should produce 3 manifests", 3, pending.allManifests().size());
+    ManifestFile manifest1 = pending.allManifests().get(0);
+    ManifestFile manifest2 = pending.allManifests().get(1);
+    ManifestFile manifest3 = pending.allManifests().get(2);
+
+    validateManifestEntries(manifest1,
+        ids(pending.snapshotId()),
+        files(FILE_D),
+        statuses(ADDED));
+
+    validateManifestEntries(manifest2,
+        ids(pending.snapshotId(), baseSnapshotId, baseSnapshotId),
+        files(FILE_A, FILE_B, FILE_C),
+        statuses(DELETED, EXISTING, EXISTING));
+
+    validateDeleteManifest(manifest3,
+        seqs(2, 2),
+        ids(pending.snapshotId(), pending.snapshotId()),
+        files(FILE_A_DELETES, FILE_B_DELETES),
+        statuses(DELETED, DELETED));
+
+    rewrite.commit();
+
+    Assert.assertTrue("Should reuse new manifest", new File(manifest1.path()).exists());
+    Assert.assertTrue("Should reuse new manifest", new File(manifest2.path()).exists());
+    Assert.assertTrue("Should reuse new manifest", new File(manifest3.path()).exists());
+
+    TableMetadata metadata = readMetadata();
+    List<ManifestFile> committedManifests = Lists.newArrayList(manifest1, manifest2, manifest3);
+    Assert.assertTrue("Should committed the manifests",
+        metadata.currentSnapshot().allManifests().containsAll(committedManifests));
+
+    // As commit success all the manifests added with rewrite should be available.
+    Assert.assertEquals("Only 5 manifest should exist", 5, listManifestFiles().size());
+  }
+
+  @Test
+  public void testReplaceEqualityDeletesWithPositionDeletes() {
+    Assume.assumeTrue("Rewriting delete files is only supported in iceberg format v2. ", formatVersion > 1);
+
+    table.newRowDelta()
+        .addRows(FILE_A2)
+        .addDeletes(FILE_A2_DELETES)
+        .commit();
+
+    TableMetadata metadata = readMetadata();
+    long baseSnapshotId = metadata.currentSnapshot().snapshotId();
+
+    // Apply and commit the rewrite transaction.
+    RewriteFiles rewrite = table.newRewrite().rewriteFiles(
+        ImmutableSet.of(), ImmutableSet.of(FILE_A2_DELETES),
+        ImmutableSet.of(), ImmutableSet.of(FILE_B_DELETES)
+    );
+    Snapshot pending = rewrite.apply();
+
+    Assert.assertEquals("Should produce 3 manifests", 3, pending.allManifests().size());
+    ManifestFile manifest1 = pending.allManifests().get(0);
+    ManifestFile manifest2 = pending.allManifests().get(1);
+    ManifestFile manifest3 = pending.allManifests().get(2);
+
+    validateManifestEntries(manifest1,
+        ids(baseSnapshotId),

Review comment:
       nit: should we verify that sequence number for this is 1? 

##########
File path: core/src/main/java/org/apache/iceberg/BaseRewriteFiles.java
##########
@@ -40,19 +40,55 @@ protected String operation() {
     return DataOperations.REPLACE;
   }
 
+  private void verifyInputAndOutputFiles(Set<DataFile> dataFilesToDelete, Set<DeleteFile> deleteFilesToDelete,
+                                         Set<DataFile> dataFilesToAdd, Set<DeleteFile> deleteFilesToAdd) {
+    int filesToDelete = 0;
+    if (dataFilesToDelete != null) {
+      filesToDelete += dataFilesToDelete.size();
+    }
+
+    if (deleteFilesToDelete != null) {
+      filesToDelete += deleteFilesToDelete.size();
+    }
+
+    Preconditions.checkArgument(filesToDelete > 0, "Files to delete cannot be null or empty");
+
+    if (deleteFilesToDelete == null || deleteFilesToDelete.isEmpty()) {
+      // When there is no delete files in the rewrite action, data files to add cannot be null or empty.
+      Preconditions.checkArgument(dataFilesToAdd != null && dataFilesToAdd.size() > 0,
+          "Data files to add can not be null or empty because there's no delete file to rewrite");

Review comment:
       Nit: might want to update to something like "Data files to add can not be null or empty when there is no delete file to be rewritten", similar applies to L61




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org