You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2021/11/01 19:25:03 UTC
[iceberg] 04/04: Core: Validate concurrently added delete files in
OvewriteFiles (#3199)
This is an automated email from the ASF dual-hosted git repository.
blue pushed a commit to branch 0.12.x
in repository https://gitbox.apache.org/repos/asf/iceberg.git
commit 85d9cdac68041321ba0b4ed6e23fd380eddf0096
Author: Anton Okolnychyi <ao...@apple.com>
AuthorDate: Fri Oct 1 11:48:43 2021 -0700
Core: Validate concurrently added delete files in OvewriteFiles (#3199)
---
.../java/org/apache/iceberg/OverwriteFiles.java | 63 ++++-
.../org/apache/iceberg/BaseOverwriteFiles.java | 50 +++-
.../iceberg/TestOverwriteWithValidation.java | 254 +++++++++++++++++++++
.../apache/iceberg/spark/source/SparkWrite.java | 13 +-
4 files changed, 371 insertions(+), 9 deletions(-)
diff --git a/api/src/main/java/org/apache/iceberg/OverwriteFiles.java b/api/src/main/java/org/apache/iceberg/OverwriteFiles.java
index e2f3d62..e85545a 100644
--- a/api/src/main/java/org/apache/iceberg/OverwriteFiles.java
+++ b/api/src/main/java/org/apache/iceberg/OverwriteFiles.java
@@ -107,23 +107,28 @@ public interface OverwriteFiles extends SnapshotUpdate<OverwriteFiles> {
OverwriteFiles caseSensitive(boolean caseSensitive);
/**
- * Enables validation that files added concurrently do not conflict with this commit's operation.
+ * Enables validation that data files added concurrently do not conflict with this commit's operation.
* <p>
- * This method should be called when the table is queried to determine which files to delete/append.
+ * This method should be called while committing non-idempotent overwrite operations.
* If a concurrent operation commits a new file after the data was read and that file might
* contain rows matching the specified conflict detection filter, the overwrite operation
- * will detect this during retries and fail.
+ * will detect this and fail.
* <p>
* Calling this method with a correct conflict detection filter is required to maintain
- * serializable isolation for eager update/delete operations. Otherwise, the isolation level
+ * serializable isolation for overwrite operations. Otherwise, the isolation level
* will be snapshot isolation.
* <p>
* Validation applies to files added to the table since the snapshot passed to {@link #validateFromSnapshot(long)}.
*
* @param conflictDetectionFilter an expression on rows in the table
* @return this for method chaining
+ * @deprecated since 0.13.0, will be removed in 0.14.0; use {@link #conflictDetectionFilter(Expression)} and
+ * {@link #validateNoConflictingData()} instead.
*/
- OverwriteFiles validateNoConflictingAppends(Expression conflictDetectionFilter);
+ @Deprecated
+ default OverwriteFiles validateNoConflictingAppends(Expression conflictDetectionFilter) {
+ return conflictDetectionFilter(conflictDetectionFilter).validateNoConflictingData();
+ }
/**
* Enables validation that files added concurrently do not conflict with this commit's operation.
@@ -145,4 +150,52 @@ public interface OverwriteFiles extends SnapshotUpdate<OverwriteFiles> {
*/
@Deprecated
OverwriteFiles validateNoConflictingAppends(Long readSnapshotId, Expression conflictDetectionFilter);
+
+ /**
+ * Sets a conflict detection filter used to validate concurrently added data and delete files.
+ *
+ * @param conflictDetectionFilter an expression on rows in the table
+ * @return this for method chaining
+ */
+ OverwriteFiles conflictDetectionFilter(Expression conflictDetectionFilter);
+
+ /**
+ * Enables validation that data added concurrently does not conflict with this commit's operation.
+ * <p>
+ * This method should be called while committing non-idempotent overwrite operations.
+ * If a concurrent operation commits a new file after the data was read and that file might
+ * contain rows matching the specified conflict detection filter, the overwrite operation
+ * will detect this and fail.
+ * <p>
+ * Calling this method with a correct conflict detection filter is required to maintain
+ * isolation for non-idempotent overwrite operations.
+ * <p>
+ * Validation uses the conflict detection filter passed to {@link #conflictDetectionFilter(Expression)} and
+ * applies to operations that happened after the snapshot passed to {@link #validateFromSnapshot(long)}.
+ * If the conflict detection filter is not set, any new data added concurrently will fail this
+ * overwrite operation.
+ *
+ * @return this for method chaining
+ */
+ OverwriteFiles validateNoConflictingData();
+
+ /**
+ * Enables validation that deletes that happened concurrently do not conflict with this commit's operation.
+ * <p>
+ * Validating concurrent deletes is required during non-idempotent overwrite operations.
+ * If a concurrent operation deletes data in one of the files being overwritten, the overwrite
+ * operation must be aborted as it may undelete rows that were removed concurrently.
+ * <p>
+ * Calling this method with a correct conflict detection filter is required to maintain
+ * isolation for non-idempotent overwrite operations.
+ * <p>
+ * Validation uses the conflict detection filter passed to {@link #conflictDetectionFilter(Expression)} and
+ * applies to operations that happened after the snapshot passed to {@link #validateFromSnapshot(long)}.
+ * If the conflict detection filter is not set, this operation will use the row filter provided
+ * in {@link #overwriteByRowFilter(Expression)} to check for new delete files and will ensure
+ * there are no conflicting deletes for data files removed via {@link #deleteFile(DataFile)}.
+ *
+ * @return this for method chaining
+ */
+ OverwriteFiles validateNoConflictingDeletes();
}
diff --git a/core/src/main/java/org/apache/iceberg/BaseOverwriteFiles.java b/core/src/main/java/org/apache/iceberg/BaseOverwriteFiles.java
index e1fbb0f..b95fa6b 100644
--- a/core/src/main/java/org/apache/iceberg/BaseOverwriteFiles.java
+++ b/core/src/main/java/org/apache/iceberg/BaseOverwriteFiles.java
@@ -19,17 +19,23 @@
package org.apache.iceberg;
+import java.util.Set;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.expressions.Evaluator;
import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.expressions.Projections;
import org.apache.iceberg.expressions.StrictMetricsEvaluator;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
public class BaseOverwriteFiles extends MergingSnapshotProducer<OverwriteFiles> implements OverwriteFiles {
+ private final Set<DataFile> deletedDataFiles = Sets.newHashSet();
private boolean validateAddedFilesMatchOverwriteFilter = false;
private Long startingSnapshotId = null;
private Expression conflictDetectionFilter = null;
+ private boolean validateNewDataFiles = false;
+ private boolean validateNewDeleteFiles = false;
private boolean caseSensitive = true;
protected BaseOverwriteFiles(String tableName, TableOperations ops) {
@@ -60,6 +66,7 @@ public class BaseOverwriteFiles extends MergingSnapshotProducer<OverwriteFiles>
@Override
public OverwriteFiles deleteFile(DataFile file) {
+ deletedDataFiles.add(file);
delete(file);
return this;
}
@@ -93,9 +100,22 @@ public class BaseOverwriteFiles extends MergingSnapshotProducer<OverwriteFiles>
}
@Override
- public OverwriteFiles validateNoConflictingAppends(Expression newConflictDetectionFilter) {
+ public OverwriteFiles conflictDetectionFilter(Expression newConflictDetectionFilter) {
Preconditions.checkArgument(newConflictDetectionFilter != null, "Conflict detection filter cannot be null");
this.conflictDetectionFilter = newConflictDetectionFilter;
+ return this;
+ }
+
+ @Override
+ public OverwriteFiles validateNoConflictingData() {
+ this.validateNewDataFiles = true;
+ failMissingDeletePaths();
+ return this;
+ }
+
+ @Override
+ public OverwriteFiles validateNoConflictingDeletes() {
+ this.validateNewDeleteFiles = true;
failMissingDeletePaths();
return this;
}
@@ -127,8 +147,32 @@ public class BaseOverwriteFiles extends MergingSnapshotProducer<OverwriteFiles>
}
}
- if (conflictDetectionFilter != null && base.currentSnapshot() != null) {
- validateAddedDataFiles(base, startingSnapshotId, conflictDetectionFilter, caseSensitive);
+
+ if (validateNewDataFiles) {
+ validateAddedDataFiles(base, startingSnapshotId, dataConflictDetectionFilter(), caseSensitive);
+ }
+
+ if (validateNewDeleteFiles) {
+ if (rowFilter() != Expressions.alwaysFalse()) {
+ Expression filter = conflictDetectionFilter != null ? conflictDetectionFilter : rowFilter();
+ validateNoNewDeleteFiles(base, startingSnapshotId, filter, caseSensitive);
+ }
+
+ if (deletedDataFiles.size() > 0) {
+ validateNoNewDeletesForDataFiles(
+ base, startingSnapshotId, conflictDetectionFilter,
+ deletedDataFiles, caseSensitive);
+ }
+ }
+ }
+
+ private Expression dataConflictDetectionFilter() {
+ if (conflictDetectionFilter != null) {
+ return conflictDetectionFilter;
+ } else if (rowFilter() != Expressions.alwaysFalse() && deletedDataFiles.isEmpty()) {
+ return rowFilter();
+ } else {
+ return Expressions.alwaysTrue();
}
}
}
diff --git a/core/src/test/java/org/apache/iceberg/TestOverwriteWithValidation.java b/core/src/test/java/org/apache/iceberg/TestOverwriteWithValidation.java
index 2635886..3d2018f 100644
--- a/core/src/test/java/org/apache/iceberg/TestOverwriteWithValidation.java
+++ b/core/src/test/java/org/apache/iceberg/TestOverwriteWithValidation.java
@@ -29,6 +29,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
import org.apache.iceberg.types.Types;
import org.junit.Assert;
+import org.junit.Assume;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -71,6 +72,14 @@ public class TestOverwriteWithValidation extends TableTestBase {
))
.build();
+ private static final DeleteFile FILE_DAY_1_POS_DELETES = FileMetadata.deleteFileBuilder(PARTITION_SPEC)
+ .ofPositionDeletes()
+ .withPath("/path/to/data-1-deletes.parquet")
+ .withFileSizeInBytes(10)
+ .withPartitionPath("date=2018-06-08")
+ .withRecordCount(1)
+ .build();
+
private static final DataFile FILE_DAY_2 = DataFiles
.builder(PARTITION_SPEC)
.withPath("/path/to/data-2.parquet")
@@ -85,6 +94,22 @@ public class TestOverwriteWithValidation extends TableTestBase {
))
.build();
+ private static final DeleteFile FILE_DAY_2_EQ_DELETES = FileMetadata.deleteFileBuilder(PARTITION_SPEC)
+ .ofEqualityDeletes()
+ .withPath("/path/to/data-2-eq-deletes.parquet")
+ .withFileSizeInBytes(10)
+ .withPartitionPath("date=2018-06-09")
+ .withRecordCount(1)
+ .build();
+
+ private static final DeleteFile FILE_DAY_2_POS_DELETES = FileMetadata.deleteFileBuilder(PARTITION_SPEC)
+ .ofPositionDeletes()
+ .withPath("/path/to/data-2-deletes.parquet")
+ .withFileSizeInBytes(10)
+ .withPartitionPath("date=2018-06-09")
+ .withRecordCount(1)
+ .build();
+
private static final DataFile FILE_DAY_2_MODIFIED = DataFiles
.builder(PARTITION_SPEC)
.withPath("/path/to/data-3.parquet")
@@ -113,6 +138,21 @@ public class TestOverwriteWithValidation extends TableTestBase {
))
.build();
+ private static final DeleteFile FILE_DAY_2_ANOTHER_RANGE_EQ_DELETES = FileMetadata.deleteFileBuilder(PARTITION_SPEC)
+ .ofEqualityDeletes()
+ .withPath("/path/to/data-3-eq-deletes.parquet")
+ .withFileSizeInBytes(10)
+ .withPartitionPath("date=2018-06-09")
+ .withRecordCount(1)
+ .withMetrics(new Metrics(1L,
+ null, // no column sizes
+ ImmutableMap.of(1, 1L, 2, 1L), // value count
+ ImmutableMap.of(1, 0L, 2, 0L), // null count
+ ImmutableMap.of(1, longToBuffer(10L)), // lower bounds
+ ImmutableMap.of(1, longToBuffer(10L)) // upper bounds
+ ))
+ .build();
+
private static final Expression EXPRESSION_DAY_2 = equal("date", "2018-06-09");
private static final Expression EXPRESSION_DAY_2_ID_RANGE = and(
@@ -611,4 +651,218 @@ public class TestOverwriteWithValidation extends TableTestBase {
Assert.assertEquals("Should not create a new snapshot",
committedSnapshotId, table.currentSnapshot().snapshotId());
}
+
+ @Test
+ public void testConcurrentConflictingPositionDeletes() {
+ Assume.assumeTrue(formatVersion == 2);
+
+ Assert.assertNull("Should be empty table", table.currentSnapshot());
+
+ table.newAppend()
+ .appendFile(FILE_DAY_1)
+ .appendFile(FILE_DAY_2)
+ .commit();
+
+ Snapshot firstSnapshot = table.currentSnapshot();
+
+ OverwriteFiles overwrite = table.newOverwrite()
+ .deleteFile(FILE_DAY_2)
+ .addFile(FILE_DAY_2_MODIFIED)
+ .validateFromSnapshot(firstSnapshot.snapshotId())
+ .conflictDetectionFilter(EXPRESSION_DAY_2)
+ .validateNoConflictingData()
+ .validateNoConflictingDeletes();
+
+ table.newRowDelta()
+ .addDeletes(FILE_DAY_2_POS_DELETES)
+ .commit();
+
+ AssertHelpers.assertThrows("Should reject commit",
+ ValidationException.class, "found new delete",
+ overwrite::commit);
+ }
+
+ @Test
+ public void testConcurrentConflictingPositionDeletesOverwriteByFilter() {
+ Assume.assumeTrue(formatVersion == 2);
+
+ Assert.assertNull("Should be empty table", table.currentSnapshot());
+
+ table.newAppend()
+ .appendFile(FILE_DAY_1)
+ .appendFile(FILE_DAY_2)
+ .commit();
+
+ Snapshot firstSnapshot = table.currentSnapshot();
+
+ OverwriteFiles overwrite = table.newOverwrite()
+ .overwriteByRowFilter(EXPRESSION_DAY_2)
+ .addFile(FILE_DAY_2_MODIFIED)
+ .validateFromSnapshot(firstSnapshot.snapshotId())
+ .conflictDetectionFilter(EXPRESSION_DAY_2)
+ .validateNoConflictingData()
+ .validateNoConflictingDeletes();
+
+ table.newRowDelta()
+ .addDeletes(FILE_DAY_2_POS_DELETES)
+ .commit();
+
+ AssertHelpers.assertThrows("Should reject commit",
+ ValidationException.class, "Found new conflicting delete",
+ overwrite::commit);
+ }
+
+ @Test
+ public void testConcurrentNonConflictingPositionDeletes() {
+ Assume.assumeTrue(formatVersion == 2);
+
+ Assert.assertNull("Should be empty table", table.currentSnapshot());
+
+ table.newAppend()
+ .appendFile(FILE_DAY_1)
+ .appendFile(FILE_DAY_2)
+ .commit();
+
+ Snapshot firstSnapshot = table.currentSnapshot();
+
+ OverwriteFiles overwrite = table.newOverwrite()
+ .deleteFile(FILE_DAY_2)
+ .addFile(FILE_DAY_2_MODIFIED)
+ .validateFromSnapshot(firstSnapshot.snapshotId())
+ .conflictDetectionFilter(EXPRESSION_DAY_2)
+ .validateNoConflictingData()
+ .validateNoConflictingDeletes();
+
+ table.newRowDelta()
+ .addDeletes(FILE_DAY_1_POS_DELETES)
+ .commit();
+
+ overwrite.commit();
+
+ validateTableFiles(table, FILE_DAY_1, FILE_DAY_2_MODIFIED);
+ validateTableDeleteFiles(table, FILE_DAY_1_POS_DELETES);
+ }
+
+ @Test
+ public void testConcurrentNonConflictingPositionDeletesOverwriteByFilter() {
+ Assume.assumeTrue(formatVersion == 2);
+
+ Assert.assertNull("Should be empty table", table.currentSnapshot());
+
+ table.newAppend()
+ .appendFile(FILE_DAY_1)
+ .appendFile(FILE_DAY_2)
+ .commit();
+
+ Snapshot firstSnapshot = table.currentSnapshot();
+
+ OverwriteFiles overwrite = table.newOverwrite()
+ .overwriteByRowFilter(EXPRESSION_DAY_2)
+ .addFile(FILE_DAY_2_MODIFIED)
+ .validateFromSnapshot(firstSnapshot.snapshotId())
+ .conflictDetectionFilter(EXPRESSION_DAY_2)
+ .validateNoConflictingData()
+ .validateNoConflictingDeletes();
+
+ table.newRowDelta()
+ .addDeletes(FILE_DAY_1_POS_DELETES)
+ .commit();
+
+ overwrite.commit();
+
+ validateTableFiles(table, FILE_DAY_1, FILE_DAY_2_MODIFIED);
+ validateTableDeleteFiles(table, FILE_DAY_1_POS_DELETES);
+ }
+
+ @Test
+ public void testConcurrentConflictingEqualityDeletes() {
+ Assume.assumeTrue(formatVersion == 2);
+
+ Assert.assertNull("Should be empty table", table.currentSnapshot());
+
+ table.newAppend()
+ .appendFile(FILE_DAY_1)
+ .appendFile(FILE_DAY_2)
+ .commit();
+
+ Snapshot firstSnapshot = table.currentSnapshot();
+
+ OverwriteFiles overwrite = table.newOverwrite()
+ .deleteFile(FILE_DAY_2)
+ .addFile(FILE_DAY_2_MODIFIED)
+ .validateFromSnapshot(firstSnapshot.snapshotId())
+ .conflictDetectionFilter(EXPRESSION_DAY_2)
+ .validateNoConflictingData()
+ .validateNoConflictingDeletes();
+
+ table.newRowDelta()
+ .addDeletes(FILE_DAY_2_EQ_DELETES)
+ .commit();
+
+ AssertHelpers.assertThrows("Should reject commit",
+ ValidationException.class, "found new delete",
+ overwrite::commit);
+ }
+
+ @Test
+ public void testConcurrentNonConflictingEqualityDeletes() {
+ Assume.assumeTrue(formatVersion == 2);
+
+ Assert.assertNull("Should be empty table", table.currentSnapshot());
+
+ table.newAppend()
+ .appendFile(FILE_DAY_2)
+ .appendFile(FILE_DAY_2_ANOTHER_RANGE)
+ .commit();
+
+ Snapshot firstSnapshot = table.currentSnapshot();
+
+ OverwriteFiles overwrite = table.newOverwrite()
+ .deleteFile(FILE_DAY_2)
+ .addFile(FILE_DAY_2_MODIFIED)
+ .validateFromSnapshot(firstSnapshot.snapshotId())
+ .conflictDetectionFilter(EXPRESSION_DAY_2_ID_RANGE)
+ .validateNoConflictingData()
+ .validateNoConflictingDeletes();
+
+ table.newRowDelta()
+ .addDeletes(FILE_DAY_2_ANOTHER_RANGE_EQ_DELETES)
+ .commit();
+
+ overwrite.commit();
+
+ validateTableFiles(table, FILE_DAY_2_ANOTHER_RANGE, FILE_DAY_2_MODIFIED);
+ validateTableDeleteFiles(table, FILE_DAY_2_ANOTHER_RANGE_EQ_DELETES);
+ }
+
+ @Test
+ public void testOverwriteByFilterInheritsConflictDetectionFilter() {
+ Assume.assumeTrue(formatVersion == 2);
+
+ Assert.assertNull("Should be empty table", table.currentSnapshot());
+
+ table.newAppend()
+ .appendFile(FILE_DAY_1)
+ .appendFile(FILE_DAY_2)
+ .commit();
+
+ Snapshot firstSnapshot = table.currentSnapshot();
+
+ OverwriteFiles overwrite = table.newOverwrite()
+ .overwriteByRowFilter(EXPRESSION_DAY_2)
+ .validateAddedFilesMatchOverwriteFilter()
+ .addFile(FILE_DAY_2_MODIFIED)
+ .validateFromSnapshot(firstSnapshot.snapshotId())
+ .validateNoConflictingData()
+ .validateNoConflictingDeletes();
+
+ table.newRowDelta()
+ .addDeletes(FILE_DAY_1_POS_DELETES)
+ .commit();
+
+ overwrite.commit();
+
+ validateTableFiles(table, FILE_DAY_1, FILE_DAY_2_MODIFIED);
+ validateTableDeleteFiles(table, FILE_DAY_1_POS_DELETES);
+ }
}
diff --git a/spark3/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java b/spark3/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
index a3da366..1ab63ab 100644
--- a/spark3/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
+++ b/spark3/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
@@ -357,7 +357,9 @@ class SparkWrite {
}
Expression conflictDetectionFilter = conflictDetectionFilter();
- overwriteFiles.validateNoConflictingAppends(conflictDetectionFilter);
+ overwriteFiles.conflictDetectionFilter(conflictDetectionFilter);
+ overwriteFiles.validateNoConflictingData();
+ overwriteFiles.validateNoConflictingDeletes();
String commitMsg = String.format(
"overwrite of %d data files with %d new data files, scanSnapshotId: %d, conflictDetectionFilter: %s",
@@ -368,6 +370,15 @@ class SparkWrite {
private void commitWithSnapshotIsolation(OverwriteFiles overwriteFiles,
int numOverwrittenFiles,
int numAddedFiles) {
+ Long scanSnapshotId = scan.snapshotId();
+ if (scanSnapshotId != null) {
+ overwriteFiles.validateFromSnapshot(scanSnapshotId);
+ }
+
+ Expression conflictDetectionFilter = conflictDetectionFilter();
+ overwriteFiles.conflictDetectionFilter(conflictDetectionFilter);
+ overwriteFiles.validateNoConflictingDeletes();
+
String commitMsg = String.format(
"overwrite of %d data files with %d new data files",
numOverwrittenFiles, numAddedFiles);