You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by ao...@apache.org on 2021/09/13 21:25:57 UTC

[iceberg] branch master updated: Core: Optimize check for referenced data files in BaseRowDelta (#3071)

This is an automated email from the ASF dual-hosted git repository.

aokolnychyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 838cc65  Core: Optimize check for referenced data files in BaseRowDelta (#3071)
838cc65 is described below

commit 838cc652273c1444155bec2e1d6029cfbdbf3ea3
Author: Anton Okolnychyi <ao...@apple.com>
AuthorDate: Mon Sep 13 11:25:49 2021 -1000

    Core: Optimize check for referenced data files in BaseRowDelta (#3071)
    
    This change optimizes our check for referenced data files in BaseRowDelta by pushing down the conflict detection filter. Previously, we would open manifests even though they belonged to partitions out of our interest.
---
 .../main/java/org/apache/iceberg/BaseRowDelta.java |   3 +-
 .../apache/iceberg/MergingSnapshotProducer.java    |   7 +-
 .../test/java/org/apache/iceberg/TestRowDelta.java | 121 +++++++++++++++++++++
 3 files changed, 129 insertions(+), 2 deletions(-)

diff --git a/core/src/main/java/org/apache/iceberg/BaseRowDelta.java b/core/src/main/java/org/apache/iceberg/BaseRowDelta.java
index 8a13713..4d80b01 100644
--- a/core/src/main/java/org/apache/iceberg/BaseRowDelta.java
+++ b/core/src/main/java/org/apache/iceberg/BaseRowDelta.java
@@ -91,7 +91,8 @@ class BaseRowDelta extends MergingSnapshotProducer<RowDelta> implements RowDelta
   protected void validate(TableMetadata base) {
     if (base.currentSnapshot() != null) {
       if (!referencedDataFiles.isEmpty()) {
-        validateDataFilesExist(base, startingSnapshotId, referencedDataFiles, !validateDeletes);
+        validateDataFilesExist(
+            base, startingSnapshotId, referencedDataFiles, !validateDeletes, conflictDetectionFilter);
       }
 
       // TODO: does this need to check new delete files?
diff --git a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
index f26412b..f907314 100644
--- a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
+++ b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
@@ -318,7 +318,8 @@ abstract class MergingSnapshotProducer<ThisT> extends SnapshotProducer<ThisT> {
 
   @SuppressWarnings("CollectionUndefinedEquality")
   protected void validateDataFilesExist(TableMetadata base, Long startingSnapshotId,
-                                        CharSequenceSet requiredDataFiles, boolean skipDeletes) {
+                                        CharSequenceSet requiredDataFiles, boolean skipDeletes,
+                                        Expression conflictDetectionFilter) {
     // if there is no current table state, no files have been removed
     if (base.currentSnapshot() == null) {
       return;
@@ -339,6 +340,10 @@ abstract class MergingSnapshotProducer<ThisT> extends SnapshotProducer<ThisT> {
         .specsById(base.specsById())
         .ignoreExisting();
 
+    if (conflictDetectionFilter != null) {
+      matchingDeletesGroup.filterData(conflictDetectionFilter);
+    }
+
     try (CloseableIterator<ManifestEntry<DataFile>> deletes = matchingDeletesGroup.entries().iterator()) {
       if (deletes.hasNext()) {
         throw new ValidationException("Cannot commit, missing data files: %s",
diff --git a/core/src/test/java/org/apache/iceberg/TestRowDelta.java b/core/src/test/java/org/apache/iceberg/TestRowDelta.java
index 6a5c43c..c1dae7d 100644
--- a/core/src/test/java/org/apache/iceberg/TestRowDelta.java
+++ b/core/src/test/java/org/apache/iceberg/TestRowDelta.java
@@ -21,6 +21,7 @@ package org.apache.iceberg;
 
 import org.apache.iceberg.ManifestEntry.Status;
 import org.apache.iceberg.exceptions.ValidationException;
+import org.apache.iceberg.expressions.Expression;
 import org.apache.iceberg.expressions.Expressions;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
 import org.apache.iceberg.relocated.com.google.common.collect.Sets;
@@ -629,4 +630,124 @@ public class TestRowDelta extends V2TableTestBase {
         files(FILE_A_DELETES),
         statuses(Status.ADDED));
   }
+
+  @Test
+  public void testValidateDataFilesExistWithConflictDetectionFilter() {
+    // change the spec to be partitioned by data
+    table.updateSpec()
+        .removeField(Expressions.bucket("data", 16))
+        .addField(Expressions.ref("data"))
+        .commit();
+
+    // add a data file to partition A
+    DataFile dataFile1 = DataFiles.builder(table.spec())
+        .withPath("/path/to/data-a.parquet")
+        .withFileSizeInBytes(10)
+        .withPartitionPath("data=a")
+        .withRecordCount(1)
+        .build();
+
+    table.newAppend()
+        .appendFile(dataFile1)
+        .commit();
+
+    // add a data file to partition B
+    DataFile dataFile2 = DataFiles.builder(table.spec())
+        .withPath("/path/to/data-b.parquet")
+        .withFileSizeInBytes(10)
+        .withPartitionPath("data=b")
+        .withRecordCount(1)
+        .build();
+
+    table.newAppend()
+        .appendFile(dataFile2)
+        .commit();
+
+    // use this snapshot as the starting snapshot in rowDelta
+    Snapshot baseSnapshot = table.currentSnapshot();
+
+    // add a delete file for partition A
+    DeleteFile deleteFile = FileMetadata.deleteFileBuilder(table.spec())
+        .ofPositionDeletes()
+        .withPath("/path/to/data-a-deletes.parquet")
+        .withFileSizeInBytes(10)
+        .withPartitionPath("data=a")
+        .withRecordCount(1)
+        .build();
+
+    Expression conflictDetectionFilter = Expressions.equal("data", "a");
+    RowDelta rowDelta = table.newRowDelta()
+        .addDeletes(deleteFile)
+        .validateDataFilesExist(ImmutableList.of(dataFile1.path()))
+        .validateDeletedFiles()
+        .validateFromSnapshot(baseSnapshot.snapshotId())
+        .validateNoConflictingAppends(conflictDetectionFilter);
+
+    // concurrently delete the file for partition B
+    table.newDelete()
+        .deleteFile(dataFile2)
+        .commit();
+
+    // commit the delta for partition A
+    rowDelta.commit();
+
+    Assert.assertEquals("Table should have one new delete manifest",
+        1, table.currentSnapshot().deleteManifests().size());
+    ManifestFile deletes = table.currentSnapshot().deleteManifests().get(0);
+    validateDeleteManifest(deletes,
+        seqs(4),
+        ids(table.currentSnapshot().snapshotId()),
+        files(deleteFile),
+        statuses(Status.ADDED));
+  }
+
+  @Test
+  public void testValidateDataFilesDoNotExistWithConflictDetectionFilter() {
+    // change the spec to be partitioned by data
+    table.updateSpec()
+        .removeField(Expressions.bucket("data", 16))
+        .addField(Expressions.ref("data"))
+        .commit();
+
+    // add a data file to partition A
+    DataFile dataFile1 = DataFiles.builder(table.spec())
+        .withPath("/path/to/data-a.parquet")
+        .withFileSizeInBytes(10)
+        .withPartitionPath("data=a")
+        .withRecordCount(1)
+        .build();
+
+    table.newAppend()
+        .appendFile(dataFile1)
+        .commit();
+
+    // use this snapshot as the starting snapshot in rowDelta
+    Snapshot baseSnapshot = table.currentSnapshot();
+
+    // add a delete file for partition A
+    DeleteFile deleteFile = FileMetadata.deleteFileBuilder(table.spec())
+        .ofPositionDeletes()
+        .withPath("/path/to/data-a-deletes.parquet")
+        .withFileSizeInBytes(10)
+        .withPartitionPath("data=a")
+        .withRecordCount(1)
+        .build();
+
+    Expression conflictDetectionFilter = Expressions.equal("data", "a");
+    RowDelta rowDelta = table.newRowDelta()
+        .addDeletes(deleteFile)
+        .validateDataFilesExist(ImmutableList.of(dataFile1.path()))
+        .validateDeletedFiles()
+        .validateFromSnapshot(baseSnapshot.snapshotId())
+        .validateNoConflictingAppends(conflictDetectionFilter);
+
+    // concurrently delete the file for partition A
+    table.newDelete()
+        .deleteFile(dataFile1)
+        .commit();
+
+    AssertHelpers.assertThrows("Should fail to add deletes because data file is missing",
+        ValidationException.class, "Cannot commit, missing data files",
+        rowDelta::commit);
+  }
 }