You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2022/05/17 16:45:44 UTC

[iceberg] branch 0.13.x updated: Core: Fix delete file handling in upgraded tables with rewritten manifests (#4514) (#4782)

This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch 0.13.x
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/0.13.x by this push:
     new ed6b74e35 Core: Fix delete file handling in upgraded tables with rewritten manifests (#4514) (#4782)
ed6b74e35 is described below

commit ed6b74e35363a56d4e7de01ecaaa0a3692698849
Author: Eduard Tudenhöfner <et...@gmail.com>
AuthorDate: Tue May 17 18:45:38 2022 +0200

    Core: Fix delete file handling in upgraded tables with rewritten manifests (#4514) (#4782)
    
    Co-authored-by: vanliu <va...@tencent.com>
---
 .../apache/iceberg/MergingSnapshotProducer.java    |   2 +-
 .../apache/iceberg/TestV1ToV2RowDeltaDelete.java   | 246 +++++++++++++++++++++
 2 files changed, 247 insertions(+), 1 deletion(-)

diff --git a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
index 1e1556014..f0615b902 100644
--- a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
+++ b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
@@ -518,7 +518,7 @@ abstract class MergingSnapshotProducer<ThisT> extends SnapshotProducer<ThisT> {
         base.schema(), current != null ? current.dataManifests() : null);
     long minDataSequenceNumber = filtered.stream()
         .map(ManifestFile::minSequenceNumber)
-        .filter(seq -> seq > 0) // filter out unassigned sequence numbers in rewritten manifests
+        .filter(seq -> seq != ManifestWriter.UNASSIGNED_SEQ) // filter out unassigned in rewritten manifests
         .reduce(base.lastSequenceNumber(), Math::min);
     deleteFilterManager.dropDeleteFilesOlderThan(minDataSequenceNumber);
     List<ManifestFile> filteredDeletes = deleteFilterManager.filterManifests(
diff --git a/core/src/test/java/org/apache/iceberg/TestV1ToV2RowDeltaDelete.java b/core/src/test/java/org/apache/iceberg/TestV1ToV2RowDeltaDelete.java
new file mode 100644
index 000000000..9e27526bd
--- /dev/null
+++ b/core/src/test/java/org/apache/iceberg/TestV1ToV2RowDeltaDelete.java
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import java.util.List;
+import java.util.Optional;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.junit.Assert;
+import org.junit.Test;
+
+import static org.apache.iceberg.expressions.Expressions.bucket;
+import static org.apache.iceberg.expressions.Expressions.equal;
+
+public class TestV1ToV2RowDeltaDelete extends TableTestBase {
+
+  public TestV1ToV2RowDeltaDelete() {
+    super(1 /* table format version */);
+  }
+
+  static final DeleteFile FILE_A_POS_1 = FileMetadata.deleteFileBuilder(SPEC)
+      .ofPositionDeletes()
+      .withPath("/path/to/data-a-pos-deletes.parquet")
+      .withFileSizeInBytes(10)
+      .withPartition(FILE_A.partition())
+      .withRecordCount(1)
+      .build();
+
+  static final DeleteFile FILE_A_EQ_1 = FileMetadata.deleteFileBuilder(SPEC)
+      .ofEqualityDeletes()
+      .withPath("/path/to/data-a-eq-deletes.parquet")
+      .withFileSizeInBytes(10)
+      .withPartition(FILE_A.partition())
+      .withRecordCount(1)
+      .build();
+
+  private void verifyManifestSequenceNumber(ManifestFile mf, long sequenceNum, long minSequenceNum) {
+    Assert.assertEquals("sequence number should be " + sequenceNum,
+        mf.sequenceNumber(), sequenceNum);
+    Assert.assertEquals("min sequence number should be " + minSequenceNum,
+        mf.minSequenceNumber(), minSequenceNum);
+  }
+
+  @Test
+  public void testPartitionedTableWithPartitionEqDeletes() {
+    table.newAppend()
+        .appendFile(FILE_A)
+        .appendFile(FILE_B)
+        .appendFile(FILE_C)
+        .commit();
+
+    List<ManifestFile> dataManifests = table.currentSnapshot().dataManifests();
+    List<ManifestFile> deleteManifests = table.currentSnapshot().deleteManifests();
+    Assert.assertEquals("Should have one data manifest file",
+        1, dataManifests.size());
+    Assert.assertEquals("Should have zero delete manifest file",
+        0, deleteManifests.size());
+    ManifestFile dataManifest = dataManifests.get(0);
+    verifyManifestSequenceNumber(dataManifest, 0, 0);
+
+    // update table version to 2
+    TableOperations ops = ((BaseTable) table).operations();
+    TableMetadata base = ops.current();
+    ops.commit(base, base.upgradeToFormatVersion(2));
+
+    table.newRowDelta()
+        .addDeletes(FILE_A_EQ_1)
+        .commit();
+
+    dataManifests = table.currentSnapshot().dataManifests();
+    deleteManifests = table.currentSnapshot().deleteManifests();
+    Assert.assertEquals("Should have one data manifest file",
+        1, dataManifests.size());
+    Assert.assertEquals("Should have one delete manifest file",
+        1, deleteManifests.size());
+    Assert.assertEquals(dataManifest, dataManifests.get(0)); // data manifest not changed
+    ManifestFile deleteManifest = deleteManifests.get(0);
+    verifyManifestSequenceNumber(deleteManifest, 1, 1);
+    List<FileScanTask> tasks = Lists.newArrayList(table.newScan().planFiles().iterator());
+    Assert.assertEquals("Should have three task", 3, tasks.size());
+    Optional<FileScanTask> task = tasks.stream().filter(t -> t.file().path().equals(FILE_A.path())).findFirst();
+    Assert.assertTrue(task.isPresent());
+    Assert.assertEquals("Should have one associated delete file",
+        1, task.get().deletes().size());
+    Assert.assertEquals("Should have only pos delete file",
+        FILE_A_EQ_1.path(), task.get().deletes().get(0).path());
+
+    // first commit after row-delta changes
+    table.newDelete().deleteFile(FILE_B).commit();
+
+    dataManifests = table.currentSnapshot().dataManifests();
+    deleteManifests = table.currentSnapshot().deleteManifests();
+    Assert.assertEquals("Should have one data manifest file",
+        1, dataManifests.size());
+    Assert.assertEquals("Should have one delete manifest file",
+        1, deleteManifests.size());
+    ManifestFile dataManifest2 = dataManifests.get(0);
+    verifyManifestSequenceNumber(dataManifest2, 2, 0);
+    Assert.assertNotEquals(dataManifest, dataManifest2);
+    Assert.assertEquals(deleteManifest, deleteManifests.get(0));  // delete manifest not changed
+    tasks = Lists.newArrayList(table.newScan().planFiles().iterator());
+    Assert.assertEquals("Should have two task", 2, tasks.size());
+    task = tasks.stream().filter(t -> t.file().path().equals(FILE_A.path())).findFirst();
+    Assert.assertTrue(task.isPresent());
+    Assert.assertEquals("Should have one associated delete file",
+        1, task.get().deletes().size());
+
+    // second commit after row-delta changes
+    table.newDelete().deleteFile(FILE_C).commit();
+
+    dataManifests = table.currentSnapshot().dataManifests();
+    deleteManifests = table.currentSnapshot().deleteManifests();
+    Assert.assertEquals("Should have one data manifest file",
+        1, dataManifests.size());
+    Assert.assertEquals("Should have one delete manifest file",
+        1, deleteManifests.size());
+    ManifestFile dataManifest3 = dataManifests.get(0);
+    verifyManifestSequenceNumber(dataManifest3, 3, 0);
+    Assert.assertNotEquals(dataManifest2, dataManifest3);
+    Assert.assertEquals(deleteManifest, deleteManifests.get(0));  // delete manifest not changed
+    tasks = Lists.newArrayList(table.newScan().planFiles().iterator());
+    Assert.assertEquals("Should have one task", 1, tasks.size());
+    task = tasks.stream().filter(t -> t.file().path().equals(FILE_A.path())).findFirst();
+    Assert.assertTrue(task.isPresent());
+    Assert.assertEquals("Should have one associated delete file",
+        1, task.get().deletes().size());
+  }
+
+  @Test
+  public void testPartitionedTableWithUnrelatedPartitionDeletes() {
+    table.newAppend()
+        .appendFile(FILE_B)
+        .appendFile(FILE_C)
+        .appendFile(FILE_D)
+        .commit();
+
+    // update table version to 2
+    TableOperations ops = ((BaseTable) table).operations();
+    TableMetadata base = ops.current();
+    ops.commit(base, base.upgradeToFormatVersion(2));
+
+    table.newRowDelta()
+        .addDeletes(FILE_A_POS_1)
+        .addDeletes(FILE_A_EQ_1)
+        .commit();
+
+    List<FileScanTask> tasks = Lists.newArrayList(table.newScan().planFiles().iterator());
+    Assert.assertEquals("Should have three task", 3, tasks.size());
+    Assert.assertEquals("Should have the correct data file path",
+        FILE_B.path(), tasks.get(0).file().path());
+    Assert.assertEquals("Should have zero associated delete file",
+        0, tasks.get(0).deletes().size());
+
+    table.newDelete().deleteFile(FILE_B).commit();
+    tasks = Lists.newArrayList(table.newScan().planFiles().iterator());
+    Assert.assertEquals("Should have two task", 2, tasks.size());
+    Assert.assertEquals("Should have zero associated delete file",
+        0, tasks.get(0).deletes().size());
+
+    table.newDelete().deleteFile(FILE_C).commit();
+    tasks = Lists.newArrayList(table.newScan().planFiles().iterator());
+    Assert.assertEquals("Should have one task", 1, tasks.size());
+    Assert.assertEquals("Should have zero associated delete file",
+        0, tasks.get(0).deletes().size());
+  }
+
+
+  @Test
+  public void testPartitionedTableWithExistingDeleteFile() {
+    table.updateProperties()
+        .set(TableProperties.MANIFEST_MERGE_ENABLED, "false")
+        .commit();
+
+    table.newAppend()
+        .appendFile(FILE_A)
+        .commit();
+
+    // update table version to 2
+    TableOperations ops = ((BaseTable) table).operations();
+    TableMetadata base = ops.current();
+    ops.commit(base, base.upgradeToFormatVersion(2));
+
+    table.newRowDelta()
+        .addDeletes(FILE_A_EQ_1)
+        .commit();
+
+    table.newRowDelta()
+        .addDeletes(FILE_A_POS_1)
+        .commit();
+
+    table.updateProperties()
+        .set(TableProperties.MANIFEST_MIN_MERGE_COUNT, "1")
+        .set(TableProperties.MANIFEST_MERGE_ENABLED, "true")
+        .commit();
+
+    Assert.assertEquals("Should have two delete manifests",
+        2, table.currentSnapshot().deleteManifests().size());
+
+    // merge delete manifests
+    table.newAppend()
+        .appendFile(FILE_B)
+        .commit();
+
+    Assert.assertEquals("Should have one delete manifest",
+        1, table.currentSnapshot().deleteManifests().size());
+    Assert.assertEquals("Should have zero added delete file",
+        0, table.currentSnapshot().deleteManifests().get(0).addedFilesCount().intValue());
+    Assert.assertEquals("Should have zero deleted delete file",
+        0, table.currentSnapshot().deleteManifests().get(0).deletedFilesCount().intValue());
+    Assert.assertEquals("Should have two existing delete files",
+        2, table.currentSnapshot().deleteManifests().get(0).existingFilesCount().intValue());
+
+    List<FileScanTask> tasks =
+        Lists.newArrayList(table.newScan().filter(equal(bucket("data", BUCKETS_NUMBER), 0))
+            .planFiles().iterator());
+    Assert.assertEquals("Should have one task", 1, tasks.size());
+
+    FileScanTask task = tasks.get(0);
+    Assert.assertEquals("Should have the correct data file path",
+        FILE_A.path(), task.file().path());
+    Assert.assertEquals("Should have two associated delete files",
+        2, task.deletes().size());
+    Assert.assertEquals("Should have expected delete files",
+        Sets.newHashSet(FILE_A_EQ_1.path(), FILE_A_POS_1.path()),
+        Sets.newHashSet(Iterables.transform(task.deletes(), ContentFile::path)));
+  }
+
+}