You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2021/11/01 19:25:03 UTC

[iceberg] 04/04: Core: Validate concurrently added delete files in OvewriteFiles (#3199)

This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch 0.12.x
in repository https://gitbox.apache.org/repos/asf/iceberg.git

commit 85d9cdac68041321ba0b4ed6e23fd380eddf0096
Author: Anton Okolnychyi <ao...@apple.com>
AuthorDate: Fri Oct 1 11:48:43 2021 -0700

    Core: Validate concurrently added delete files in OvewriteFiles (#3199)
---
 .../java/org/apache/iceberg/OverwriteFiles.java    |  63 ++++-
 .../org/apache/iceberg/BaseOverwriteFiles.java     |  50 +++-
 .../iceberg/TestOverwriteWithValidation.java       | 254 +++++++++++++++++++++
 .../apache/iceberg/spark/source/SparkWrite.java    |  13 +-
 4 files changed, 371 insertions(+), 9 deletions(-)

diff --git a/api/src/main/java/org/apache/iceberg/OverwriteFiles.java b/api/src/main/java/org/apache/iceberg/OverwriteFiles.java
index e2f3d62..e85545a 100644
--- a/api/src/main/java/org/apache/iceberg/OverwriteFiles.java
+++ b/api/src/main/java/org/apache/iceberg/OverwriteFiles.java
@@ -107,23 +107,28 @@ public interface OverwriteFiles extends SnapshotUpdate<OverwriteFiles> {
   OverwriteFiles caseSensitive(boolean caseSensitive);
 
   /**
-   * Enables validation that files added concurrently do not conflict with this commit's operation.
+   * Enables validation that data files added concurrently do not conflict with this commit's operation.
    * <p>
-   * This method should be called when the table is queried to determine which files to delete/append.
+   * This method should be called while committing non-idempotent overwrite operations.
    * If a concurrent operation commits a new file after the data was read and that file might
    * contain rows matching the specified conflict detection filter, the overwrite operation
-   * will detect this during retries and fail.
+   * will detect this and fail.
    * <p>
    * Calling this method with a correct conflict detection filter is required to maintain
-   * serializable isolation for eager update/delete operations. Otherwise, the isolation level
+   * serializable isolation for overwrite operations. Otherwise, the isolation level
    * will be snapshot isolation.
    * <p>
    * Validation applies to files added to the table since the snapshot passed to {@link #validateFromSnapshot(long)}.
    *
    * @param conflictDetectionFilter an expression on rows in the table
    * @return this for method chaining
+   * @deprecated since 0.13.0, will be removed in 0.14.0; use {@link #conflictDetectionFilter(Expression)} and
+   *             {@link #validateNoConflictingData()} instead.
    */
-  OverwriteFiles validateNoConflictingAppends(Expression conflictDetectionFilter);
+  @Deprecated
+  default OverwriteFiles validateNoConflictingAppends(Expression conflictDetectionFilter) {
+    return conflictDetectionFilter(conflictDetectionFilter).validateNoConflictingData();
+  }
 
   /**
    * Enables validation that files added concurrently do not conflict with this commit's operation.
@@ -145,4 +150,52 @@ public interface OverwriteFiles extends SnapshotUpdate<OverwriteFiles> {
    */
   @Deprecated
   OverwriteFiles validateNoConflictingAppends(Long readSnapshotId, Expression conflictDetectionFilter);
+
+  /**
+   * Sets a conflict detection filter used to validate concurrently added data and delete files.
+   *
+   * @param conflictDetectionFilter an expression on rows in the table
+   * @return this for method chaining
+   */
+  OverwriteFiles conflictDetectionFilter(Expression conflictDetectionFilter);
+
+  /**
+   * Enables validation that data added concurrently does not conflict with this commit's operation.
+   * <p>
+   * This method should be called while committing non-idempotent overwrite operations.
+   * If a concurrent operation commits a new file after the data was read and that file might
+   * contain rows matching the specified conflict detection filter, the overwrite operation
+   * will detect this and fail.
+   * <p>
+   * Calling this method with a correct conflict detection filter is required to maintain
+   * isolation for non-idempotent overwrite operations.
+   * <p>
+   * Validation uses the conflict detection filter passed to {@link #conflictDetectionFilter(Expression)} and
+   * applies to operations that happened after the snapshot passed to {@link #validateFromSnapshot(long)}.
+   * If the conflict detection filter is not set, any new data added concurrently will fail this
+   * overwrite operation.
+   *
+   * @return this for method chaining
+   */
+  OverwriteFiles validateNoConflictingData();
+
+  /**
+   * Enables validation that deletes that happened concurrently do not conflict with this commit's operation.
+   * <p>
+   * Validating concurrent deletes is required during non-idempotent overwrite operations.
+   * If a concurrent operation deletes data in one of the files being overwritten, the overwrite
+   * operation must be aborted as it may undelete rows that were removed concurrently.
+   * <p>
+   * Calling this method with a correct conflict detection filter is required to maintain
+   * isolation for non-idempotent overwrite operations.
+   * <p>
+   * Validation uses the conflict detection filter passed to {@link #conflictDetectionFilter(Expression)} and
+   * applies to operations that happened after the snapshot passed to {@link #validateFromSnapshot(long)}.
+   * If the conflict detection filter is not set, this operation will use the row filter provided
+   * in {@link #overwriteByRowFilter(Expression)} to check for new delete files and will ensure
+   * there are no conflicting deletes for data files removed via {@link #deleteFile(DataFile)}.
+   *
+   * @return this for method chaining
+   */
+  OverwriteFiles validateNoConflictingDeletes();
 }
diff --git a/core/src/main/java/org/apache/iceberg/BaseOverwriteFiles.java b/core/src/main/java/org/apache/iceberg/BaseOverwriteFiles.java
index e1fbb0f..b95fa6b 100644
--- a/core/src/main/java/org/apache/iceberg/BaseOverwriteFiles.java
+++ b/core/src/main/java/org/apache/iceberg/BaseOverwriteFiles.java
@@ -19,17 +19,23 @@
 
 package org.apache.iceberg;
 
+import java.util.Set;
 import org.apache.iceberg.exceptions.ValidationException;
 import org.apache.iceberg.expressions.Evaluator;
 import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
 import org.apache.iceberg.expressions.Projections;
 import org.apache.iceberg.expressions.StrictMetricsEvaluator;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
 
 public class BaseOverwriteFiles extends MergingSnapshotProducer<OverwriteFiles> implements OverwriteFiles {
+  private final Set<DataFile> deletedDataFiles = Sets.newHashSet();
   private boolean validateAddedFilesMatchOverwriteFilter = false;
   private Long startingSnapshotId = null;
   private Expression conflictDetectionFilter = null;
+  private boolean validateNewDataFiles = false;
+  private boolean validateNewDeleteFiles = false;
   private boolean caseSensitive = true;
 
   protected BaseOverwriteFiles(String tableName, TableOperations ops) {
@@ -60,6 +66,7 @@ public class BaseOverwriteFiles extends MergingSnapshotProducer<OverwriteFiles>
 
   @Override
   public OverwriteFiles deleteFile(DataFile file) {
+    deletedDataFiles.add(file);
     delete(file);
     return this;
   }
@@ -93,9 +100,22 @@ public class BaseOverwriteFiles extends MergingSnapshotProducer<OverwriteFiles>
   }
 
   @Override
-  public OverwriteFiles validateNoConflictingAppends(Expression newConflictDetectionFilter) {
+  public OverwriteFiles conflictDetectionFilter(Expression newConflictDetectionFilter) {
     Preconditions.checkArgument(newConflictDetectionFilter != null, "Conflict detection filter cannot be null");
     this.conflictDetectionFilter = newConflictDetectionFilter;
+    return this;
+  }
+
+  @Override
+  public OverwriteFiles validateNoConflictingData() {
+    this.validateNewDataFiles = true;
+    failMissingDeletePaths();
+    return this;
+  }
+
+  @Override
+  public OverwriteFiles validateNoConflictingDeletes() {
+    this.validateNewDeleteFiles = true;
     failMissingDeletePaths();
     return this;
   }
@@ -127,8 +147,32 @@ public class BaseOverwriteFiles extends MergingSnapshotProducer<OverwriteFiles>
       }
     }
 
-    if (conflictDetectionFilter != null && base.currentSnapshot() != null) {
-      validateAddedDataFiles(base, startingSnapshotId, conflictDetectionFilter, caseSensitive);
+
+    if (validateNewDataFiles) {
+      validateAddedDataFiles(base, startingSnapshotId, dataConflictDetectionFilter(), caseSensitive);
+    }
+
+    if (validateNewDeleteFiles) {
+      if (rowFilter() != Expressions.alwaysFalse()) {
+        Expression filter = conflictDetectionFilter != null ? conflictDetectionFilter : rowFilter();
+        validateNoNewDeleteFiles(base, startingSnapshotId, filter, caseSensitive);
+      }
+
+      if (deletedDataFiles.size() > 0) {
+        validateNoNewDeletesForDataFiles(
+            base, startingSnapshotId, conflictDetectionFilter,
+            deletedDataFiles, caseSensitive);
+      }
+    }
+  }
+
+  private Expression dataConflictDetectionFilter() {
+    if (conflictDetectionFilter != null) {
+      return conflictDetectionFilter;
+    } else if (rowFilter() != Expressions.alwaysFalse() && deletedDataFiles.isEmpty()) {
+      return rowFilter();
+    } else {
+      return Expressions.alwaysTrue();
     }
   }
 }
diff --git a/core/src/test/java/org/apache/iceberg/TestOverwriteWithValidation.java b/core/src/test/java/org/apache/iceberg/TestOverwriteWithValidation.java
index 2635886..3d2018f 100644
--- a/core/src/test/java/org/apache/iceberg/TestOverwriteWithValidation.java
+++ b/core/src/test/java/org/apache/iceberg/TestOverwriteWithValidation.java
@@ -29,6 +29,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
 import org.apache.iceberg.types.Types;
 import org.junit.Assert;
+import org.junit.Assume;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -71,6 +72,14 @@ public class TestOverwriteWithValidation extends TableTestBase {
       ))
       .build();
 
+  private static final DeleteFile FILE_DAY_1_POS_DELETES = FileMetadata.deleteFileBuilder(PARTITION_SPEC)
+      .ofPositionDeletes()
+      .withPath("/path/to/data-1-deletes.parquet")
+      .withFileSizeInBytes(10)
+      .withPartitionPath("date=2018-06-08")
+      .withRecordCount(1)
+      .build();
+
   private static final DataFile FILE_DAY_2 = DataFiles
       .builder(PARTITION_SPEC)
       .withPath("/path/to/data-2.parquet")
@@ -85,6 +94,22 @@ public class TestOverwriteWithValidation extends TableTestBase {
       ))
       .build();
 
+  private static final DeleteFile FILE_DAY_2_EQ_DELETES = FileMetadata.deleteFileBuilder(PARTITION_SPEC)
+      .ofEqualityDeletes()
+      .withPath("/path/to/data-2-eq-deletes.parquet")
+      .withFileSizeInBytes(10)
+      .withPartitionPath("date=2018-06-09")
+      .withRecordCount(1)
+      .build();
+
+  private static final DeleteFile FILE_DAY_2_POS_DELETES = FileMetadata.deleteFileBuilder(PARTITION_SPEC)
+      .ofPositionDeletes()
+      .withPath("/path/to/data-2-deletes.parquet")
+      .withFileSizeInBytes(10)
+      .withPartitionPath("date=2018-06-09")
+      .withRecordCount(1)
+      .build();
+
   private static final DataFile FILE_DAY_2_MODIFIED = DataFiles
       .builder(PARTITION_SPEC)
       .withPath("/path/to/data-3.parquet")
@@ -113,6 +138,21 @@ public class TestOverwriteWithValidation extends TableTestBase {
       ))
       .build();
 
+  private static final DeleteFile FILE_DAY_2_ANOTHER_RANGE_EQ_DELETES = FileMetadata.deleteFileBuilder(PARTITION_SPEC)
+      .ofEqualityDeletes()
+      .withPath("/path/to/data-3-eq-deletes.parquet")
+      .withFileSizeInBytes(10)
+      .withPartitionPath("date=2018-06-09")
+      .withRecordCount(1)
+      .withMetrics(new Metrics(1L,
+          null, // no column sizes
+          ImmutableMap.of(1, 1L, 2, 1L), // value count
+          ImmutableMap.of(1, 0L, 2, 0L), // null count
+          ImmutableMap.of(1, longToBuffer(10L)), // lower bounds
+          ImmutableMap.of(1, longToBuffer(10L)) // upper bounds
+      ))
+      .build();
+
   private static final Expression EXPRESSION_DAY_2 = equal("date", "2018-06-09");
 
   private static final Expression EXPRESSION_DAY_2_ID_RANGE = and(
@@ -611,4 +651,218 @@ public class TestOverwriteWithValidation extends TableTestBase {
     Assert.assertEquals("Should not create a new snapshot",
         committedSnapshotId, table.currentSnapshot().snapshotId());
   }
+
+  @Test
+  public void testConcurrentConflictingPositionDeletes() {
+    Assume.assumeTrue(formatVersion == 2);
+
+    Assert.assertNull("Should be empty table", table.currentSnapshot());
+
+    table.newAppend()
+        .appendFile(FILE_DAY_1)
+        .appendFile(FILE_DAY_2)
+        .commit();
+
+    Snapshot firstSnapshot = table.currentSnapshot();
+
+    OverwriteFiles overwrite = table.newOverwrite()
+        .deleteFile(FILE_DAY_2)
+        .addFile(FILE_DAY_2_MODIFIED)
+        .validateFromSnapshot(firstSnapshot.snapshotId())
+        .conflictDetectionFilter(EXPRESSION_DAY_2)
+        .validateNoConflictingData()
+        .validateNoConflictingDeletes();
+
+    table.newRowDelta()
+        .addDeletes(FILE_DAY_2_POS_DELETES)
+        .commit();
+
+    AssertHelpers.assertThrows("Should reject commit",
+        ValidationException.class, "found new delete",
+        overwrite::commit);
+  }
+
+  @Test
+  public void testConcurrentConflictingPositionDeletesOverwriteByFilter() {
+    Assume.assumeTrue(formatVersion == 2);
+
+    Assert.assertNull("Should be empty table", table.currentSnapshot());
+
+    table.newAppend()
+        .appendFile(FILE_DAY_1)
+        .appendFile(FILE_DAY_2)
+        .commit();
+
+    Snapshot firstSnapshot = table.currentSnapshot();
+
+    OverwriteFiles overwrite = table.newOverwrite()
+        .overwriteByRowFilter(EXPRESSION_DAY_2)
+        .addFile(FILE_DAY_2_MODIFIED)
+        .validateFromSnapshot(firstSnapshot.snapshotId())
+        .conflictDetectionFilter(EXPRESSION_DAY_2)
+        .validateNoConflictingData()
+        .validateNoConflictingDeletes();
+
+    table.newRowDelta()
+        .addDeletes(FILE_DAY_2_POS_DELETES)
+        .commit();
+
+    AssertHelpers.assertThrows("Should reject commit",
+        ValidationException.class, "Found new conflicting delete",
+        overwrite::commit);
+  }
+
+  @Test
+  public void testConcurrentNonConflictingPositionDeletes() {
+    Assume.assumeTrue(formatVersion == 2);
+
+    Assert.assertNull("Should be empty table", table.currentSnapshot());
+
+    table.newAppend()
+        .appendFile(FILE_DAY_1)
+        .appendFile(FILE_DAY_2)
+        .commit();
+
+    Snapshot firstSnapshot = table.currentSnapshot();
+
+    OverwriteFiles overwrite = table.newOverwrite()
+        .deleteFile(FILE_DAY_2)
+        .addFile(FILE_DAY_2_MODIFIED)
+        .validateFromSnapshot(firstSnapshot.snapshotId())
+        .conflictDetectionFilter(EXPRESSION_DAY_2)
+        .validateNoConflictingData()
+        .validateNoConflictingDeletes();
+
+    table.newRowDelta()
+        .addDeletes(FILE_DAY_1_POS_DELETES)
+        .commit();
+
+    overwrite.commit();
+
+    validateTableFiles(table, FILE_DAY_1, FILE_DAY_2_MODIFIED);
+    validateTableDeleteFiles(table, FILE_DAY_1_POS_DELETES);
+  }
+
+  @Test
+  public void testConcurrentNonConflictingPositionDeletesOverwriteByFilter() {
+    Assume.assumeTrue(formatVersion == 2);
+
+    Assert.assertNull("Should be empty table", table.currentSnapshot());
+
+    table.newAppend()
+        .appendFile(FILE_DAY_1)
+        .appendFile(FILE_DAY_2)
+        .commit();
+
+    Snapshot firstSnapshot = table.currentSnapshot();
+
+    OverwriteFiles overwrite = table.newOverwrite()
+        .overwriteByRowFilter(EXPRESSION_DAY_2)
+        .addFile(FILE_DAY_2_MODIFIED)
+        .validateFromSnapshot(firstSnapshot.snapshotId())
+        .conflictDetectionFilter(EXPRESSION_DAY_2)
+        .validateNoConflictingData()
+        .validateNoConflictingDeletes();
+
+    table.newRowDelta()
+        .addDeletes(FILE_DAY_1_POS_DELETES)
+        .commit();
+
+    overwrite.commit();
+
+    validateTableFiles(table, FILE_DAY_1, FILE_DAY_2_MODIFIED);
+    validateTableDeleteFiles(table, FILE_DAY_1_POS_DELETES);
+  }
+
+  @Test
+  public void testConcurrentConflictingEqualityDeletes() {
+    Assume.assumeTrue(formatVersion == 2);
+
+    Assert.assertNull("Should be empty table", table.currentSnapshot());
+
+    table.newAppend()
+        .appendFile(FILE_DAY_1)
+        .appendFile(FILE_DAY_2)
+        .commit();
+
+    Snapshot firstSnapshot = table.currentSnapshot();
+
+    OverwriteFiles overwrite = table.newOverwrite()
+        .deleteFile(FILE_DAY_2)
+        .addFile(FILE_DAY_2_MODIFIED)
+        .validateFromSnapshot(firstSnapshot.snapshotId())
+        .conflictDetectionFilter(EXPRESSION_DAY_2)
+        .validateNoConflictingData()
+        .validateNoConflictingDeletes();
+
+    table.newRowDelta()
+        .addDeletes(FILE_DAY_2_EQ_DELETES)
+        .commit();
+
+    AssertHelpers.assertThrows("Should reject commit",
+        ValidationException.class, "found new delete",
+        overwrite::commit);
+  }
+
+  @Test
+  public void testConcurrentNonConflictingEqualityDeletes() {
+    Assume.assumeTrue(formatVersion == 2);
+
+    Assert.assertNull("Should be empty table", table.currentSnapshot());
+
+    table.newAppend()
+        .appendFile(FILE_DAY_2)
+        .appendFile(FILE_DAY_2_ANOTHER_RANGE)
+        .commit();
+
+    Snapshot firstSnapshot = table.currentSnapshot();
+
+    OverwriteFiles overwrite = table.newOverwrite()
+        .deleteFile(FILE_DAY_2)
+        .addFile(FILE_DAY_2_MODIFIED)
+        .validateFromSnapshot(firstSnapshot.snapshotId())
+        .conflictDetectionFilter(EXPRESSION_DAY_2_ID_RANGE)
+        .validateNoConflictingData()
+        .validateNoConflictingDeletes();
+
+    table.newRowDelta()
+        .addDeletes(FILE_DAY_2_ANOTHER_RANGE_EQ_DELETES)
+        .commit();
+
+    overwrite.commit();
+
+    validateTableFiles(table, FILE_DAY_2_ANOTHER_RANGE, FILE_DAY_2_MODIFIED);
+    validateTableDeleteFiles(table, FILE_DAY_2_ANOTHER_RANGE_EQ_DELETES);
+  }
+
+  @Test
+  public void testOverwriteByFilterInheritsConflictDetectionFilter() {
+    Assume.assumeTrue(formatVersion == 2);
+
+    Assert.assertNull("Should be empty table", table.currentSnapshot());
+
+    table.newAppend()
+        .appendFile(FILE_DAY_1)
+        .appendFile(FILE_DAY_2)
+        .commit();
+
+    Snapshot firstSnapshot = table.currentSnapshot();
+
+    OverwriteFiles overwrite = table.newOverwrite()
+        .overwriteByRowFilter(EXPRESSION_DAY_2)
+        .validateAddedFilesMatchOverwriteFilter()
+        .addFile(FILE_DAY_2_MODIFIED)
+        .validateFromSnapshot(firstSnapshot.snapshotId())
+        .validateNoConflictingData()
+        .validateNoConflictingDeletes();
+
+    table.newRowDelta()
+        .addDeletes(FILE_DAY_1_POS_DELETES)
+        .commit();
+
+    overwrite.commit();
+
+    validateTableFiles(table, FILE_DAY_1, FILE_DAY_2_MODIFIED);
+    validateTableDeleteFiles(table, FILE_DAY_1_POS_DELETES);
+  }
 }
diff --git a/spark3/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java b/spark3/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
index a3da366..1ab63ab 100644
--- a/spark3/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
+++ b/spark3/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
@@ -357,7 +357,9 @@ class SparkWrite {
       }
 
       Expression conflictDetectionFilter = conflictDetectionFilter();
-      overwriteFiles.validateNoConflictingAppends(conflictDetectionFilter);
+      overwriteFiles.conflictDetectionFilter(conflictDetectionFilter);
+      overwriteFiles.validateNoConflictingData();
+      overwriteFiles.validateNoConflictingDeletes();
 
       String commitMsg = String.format(
           "overwrite of %d data files with %d new data files, scanSnapshotId: %d, conflictDetectionFilter: %s",
@@ -368,6 +370,15 @@ class SparkWrite {
     private void commitWithSnapshotIsolation(OverwriteFiles overwriteFiles,
                                              int numOverwrittenFiles,
                                              int numAddedFiles) {
+      Long scanSnapshotId = scan.snapshotId();
+      if (scanSnapshotId != null) {
+        overwriteFiles.validateFromSnapshot(scanSnapshotId);
+      }
+
+      Expression conflictDetectionFilter = conflictDetectionFilter();
+      overwriteFiles.conflictDetectionFilter(conflictDetectionFilter);
+      overwriteFiles.validateNoConflictingDeletes();
+
       String commitMsg = String.format(
           "overwrite of %d data files with %d new data files",
           numOverwrittenFiles, numAddedFiles);