You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2021/09/24 06:09:30 UTC

[GitHub] [iceberg] jackye1995 commented on a change in pull request #3069: Core: Validate conflicting delete files in RowDelta and OverwriteFiles

jackye1995 commented on a change in pull request #3069:
URL: https://github.com/apache/iceberg/pull/3069#discussion_r715325953



##########
File path: spark3/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
##########
@@ -375,6 +376,14 @@ private void commitWithSerializableIsolation(OverwriteFiles overwriteFiles,
     private void commitWithSnapshotIsolation(OverwriteFiles overwriteFiles,
                                              int numOverwrittenFiles,
                                              int numAddedFiles) {
+      Long scanSnapshotId = scan.snapshotId();
+      if (scanSnapshotId != null) {
+        overwriteFiles.validateFromSnapshot(scanSnapshotId);
+      }
+
+      Expression conflictDetectionFilter = conflictDetectionFilter();

Review comment:
       nit: can combine L384 and L385, `conflictDetectionFilter` only used once

##########
File path: core/src/main/java/org/apache/iceberg/BaseRowDelta.java
##########
@@ -94,9 +102,12 @@ protected void validate(TableMetadata base) {
         validateDataFilesExist(base, startingSnapshotId, referencedDataFiles, !validateDeletes);
       }
 
-      // TODO: does this need to check new delete files?
-      if (conflictDetectionFilter != null) {
-        validateAddedDataFiles(base, startingSnapshotId, conflictDetectionFilter, caseSensitive);
+      if (appendConflictDetectionFilter != null) {
+        validateAddedDataFiles(base, startingSnapshotId, appendConflictDetectionFilter, caseSensitive);
+      }
+
+      if (deleteConflictDetectionFilter != null) {
+        validateNoNewDeletes(base, startingSnapshotId, deleteConflictDetectionFilter, caseSensitive);

Review comment:
       A bit late to the whole discussion. Regarding the check, I read the outlined way to optimize it, just want to share some thoughts based on what I am doing for position deletes of my internal distribution as of today. 
   
   In my system, each position delete file written contains exactly 1 `file_path` value, which avoids the requirement from the spec to sort by file path and also greatly simplifies the validation during concurrent commits, because each check can easily find all position deletes of each data file and check against just the position min max to see if there is any potential overlapping of the position range. Of course this cannot be applied to a general use case, it was implemented just to see what can be achieved with a closed system where all delete writers only write that specific type of position delete file.
   
   When I started to compact position delete files to contain multiple `file_path` values, it becomes very easy to have false-negatives, especially in the object storage mode where the `file_path` min and max does not really mean anything anymore. So at least from the object storage use case, secondary index with much better file skipping ability is a must have to make the strategy described truly work efficiently.

##########
File path: core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
##########
@@ -316,6 +329,57 @@ protected void validateNoNewDeletesForDataFiles(TableMetadata base, Long startin
     }
   }
 
+  /**
+   * Validates that no delete files matching a filter have been added to the table since a starting snapshot.
+   *
+   * @param base table metadata to validate
+   * @param startingSnapshotId id of the snapshot current at the start of the operation
+   * @param dataFilter an expression used to find new conflicting delete files
+   * @param caseSensitive whether expression evaluation should be case-sensitive
+   */
+  protected void validateNoNewDeletes(TableMetadata base, Long startingSnapshotId,
+                                      Expression dataFilter, boolean caseSensitive) {
+    // if there is no current table state, no files have been added
+    if (base.currentSnapshot() == null) {
+      return;
+    }
+
+    Pair<List<ManifestFile>, Set<Long>> history =
+        validationHistory(base, startingSnapshotId, VALIDATE_ADDED_DELETE_FILES_OPERATIONS, ManifestContent.DELETES);
+    List<ManifestFile> deleteManifests = history.first();
+
+    long startingSequenceNumber = startingSequenceNumber(base, startingSnapshotId);
+    DeleteFileIndex deletes = buildDeleteFileIndex(deleteManifests, startingSequenceNumber, dataFilter, caseSensitive);
+
+    ValidationException.check(deletes.isEmpty(),
+        "Found new conflicting delete files that can apply to records matching %s: %s",
+        dataFilter, Iterables.transform(deletes.referencedDeleteFiles(), ContentFile::path));
+  }
+
+  // use 0 as a starting seq number if the starting snapshot is not set or expired
+  private long startingSequenceNumber(TableMetadata metadata, Long staringSnapshotId) {
+    if (staringSnapshotId != null && metadata.snapshot(staringSnapshotId) != null) {
+      Snapshot startingSnapshot = metadata.snapshot(staringSnapshotId);
+      return startingSnapshot.sequenceNumber();
+    } else {
+      return 0;

Review comment:
       nit: can use `TableMetadata.INITIAL_SEQUENCE_NUMBER` and remove the comment




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org