You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by "zhongyujiang (via GitHub)" <gi...@apache.org> on 2023/04/03 03:52:31 UTC

[GitHub] [iceberg] zhongyujiang commented on a diff in pull request #7249: Avoid conflicts between rewrite datafiles and flink CDC writes

zhongyujiang commented on code in PR #7249:
URL: https://github.com/apache/iceberg/pull/7249#discussion_r1155423494


##########
core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java:
##########
@@ -444,6 +444,12 @@ private void validateNoNewDeletesForDataFiles(
     if (parent == null || base.formatVersion() < 2) {
       return;
     }
+    List<Snapshot> snapshots =

Review Comment:
   Style: Should have a blank line after a code block.



##########
core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java:
##########
@@ -444,6 +444,12 @@ private void validateNoNewDeletesForDataFiles(
     if (parent == null || base.formatVersion() < 2) {
       return;
     }
+    List<Snapshot> snapshots =
+        Lists.newArrayList(
+            SnapshotUtil.ancestorsBetween(
+                base.currentSnapshot().snapshotId(), startingSnapshotId, base::snapshot));
+    boolean ignorePositionDeletes =
+        snapshots.stream().allMatch(this::cannotContainPosDeletesForPreviousSnapshots);

Review Comment:
   I think `hasNoConflictingPosDeletes` might  be more appropriate, because this step is to judge whether the pos deletes in the new snapshots cannot have conflicts.



##########
core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java:
##########
@@ -453,20 +459,47 @@ private void validateNoNewDeletesForDataFiles(
       // fail
       DeleteFile[] deleteFiles = deletes.forDataFile(startingSequenceNumber, dataFile);
       if (ignoreEqualityDeletes) {
-        ValidationException.check(
-            Arrays.stream(deleteFiles)
-                .noneMatch(deleteFile -> deleteFile.content() == FileContent.POSITION_DELETES),
-            "Cannot commit, found new position delete for replaced data file: %s",
-            dataFile);
+        if (!ignorePositionDeletes) {
+          ValidationException.check(
+              Arrays.stream(deleteFiles)
+                  .noneMatch(deleteFile -> deleteFile.content() == FileContent.POSITION_DELETES),
+              "Cannot commit, found new position delete for replaced data file: %s",
+              dataFile);
+        }
       } else {
-        ValidationException.check(
-            deleteFiles.length == 0,
-            "Cannot commit, found new delete for replaced data file: %s",
-            dataFile);
+        if (ignorePositionDeletes) {
+          ValidationException.check(
+              Arrays.stream(deleteFiles)
+                  .noneMatch(deleteFile -> deleteFile.content() == FileContent.EQUALITY_DELETES),
+              "Cannot commit, found new equality delete for replaced data file: %s",
+              dataFile);
+        } else {
+          ValidationException.check(
+              deleteFiles.length == 0,
+              "Cannot commit, found new delete for replaced data file: %s",
+              dataFile);
+        }
       }
     }
   }
 
+  private boolean cannotContainPosDeletesForPreviousSnapshots(Snapshot snapshot) {
+    switch (snapshot.operation()) {
+      case DataOperations.APPEND:
+      case DataOperations.REPLACE:
+        return true;
+      case DataOperations.DELETE:
+      case DataOperations.OVERWRITE:
+        return Boolean.parseBoolean(
+            snapshot
+                .summary()
+                .getOrDefault(SnapshotSummary.POSITION_DELETES_WITHIN_COMMIT_ONLY, "false"));
+      default:
+        throw new RuntimeException(
+            String.format("Unknown data operation: %s", snapshot.operation()));

Review Comment:
   Nit: For this method, I think we can directly return false instead of throwing an exception for those unknow data operations.



##########
core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java:
##########
@@ -453,20 +459,47 @@ private void validateNoNewDeletesForDataFiles(
       // fail
       DeleteFile[] deleteFiles = deletes.forDataFile(startingSequenceNumber, dataFile);
       if (ignoreEqualityDeletes) {
-        ValidationException.check(
-            Arrays.stream(deleteFiles)
-                .noneMatch(deleteFile -> deleteFile.content() == FileContent.POSITION_DELETES),
-            "Cannot commit, found new position delete for replaced data file: %s",
-            dataFile);
+        if (!ignorePositionDeletes) {
+          ValidationException.check(
+              Arrays.stream(deleteFiles)
+                  .noneMatch(deleteFile -> deleteFile.content() == FileContent.POSITION_DELETES),
+              "Cannot commit, found new position delete for replaced data file: %s",
+              dataFile);
+        }
       } else {
-        ValidationException.check(
-            deleteFiles.length == 0,
-            "Cannot commit, found new delete for replaced data file: %s",
-            dataFile);
+        if (ignorePositionDeletes) {
+          ValidationException.check(
+              Arrays.stream(deleteFiles)
+                  .noneMatch(deleteFile -> deleteFile.content() == FileContent.EQUALITY_DELETES),
+              "Cannot commit, found new equality delete for replaced data file: %s",
+              dataFile);
+        } else {
+          ValidationException.check(
+              deleteFiles.length == 0,
+              "Cannot commit, found new delete for replaced data file: %s",
+              dataFile);
+        }
       }
     }
   }
 
+  private boolean cannotContainPosDeletesForPreviousSnapshots(Snapshot snapshot) {
+    switch (snapshot.operation()) {
+      case DataOperations.APPEND:
+      case DataOperations.REPLACE:
+        return true;
+      case DataOperations.DELETE:

Review Comment:
   I think [`DELETE`](https://github.com/apache/iceberg/blob/c07f2aabc0a1d02f068ecf1514d2479c0fbdd3b0/api/src/main/java/org/apache/iceberg/DataOperations.java#L55) snapshot cannot contain pos deletes either.



##########
core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java:
##########
@@ -453,20 +459,47 @@ private void validateNoNewDeletesForDataFiles(
       // fail
       DeleteFile[] deleteFiles = deletes.forDataFile(startingSequenceNumber, dataFile);
       if (ignoreEqualityDeletes) {
-        ValidationException.check(
-            Arrays.stream(deleteFiles)
-                .noneMatch(deleteFile -> deleteFile.content() == FileContent.POSITION_DELETES),
-            "Cannot commit, found new position delete for replaced data file: %s",
-            dataFile);
+        if (!ignorePositionDeletes) {
+          ValidationException.check(
+              Arrays.stream(deleteFiles)
+                  .noneMatch(deleteFile -> deleteFile.content() == FileContent.POSITION_DELETES),
+              "Cannot commit, found new position delete for replaced data file: %s",
+              dataFile);
+        }
       } else {
-        ValidationException.check(
-            deleteFiles.length == 0,
-            "Cannot commit, found new delete for replaced data file: %s",
-            dataFile);
+        if (ignorePositionDeletes) {
+          ValidationException.check(
+              Arrays.stream(deleteFiles)
+                  .noneMatch(deleteFile -> deleteFile.content() == FileContent.EQUALITY_DELETES),
+              "Cannot commit, found new equality delete for replaced data file: %s",
+              dataFile);
+        } else {
+          ValidationException.check(
+              deleteFiles.length == 0,
+              "Cannot commit, found new delete for replaced data file: %s",
+              dataFile);
+        }

Review Comment:
   Seems like we can rewrite the logic of L454-L481  to be like this:
   ```
   if(ignoreEq && ignorePos) {
    return;
   }
   
   ...
   
   for (DataFile dataFile : dataFiles) {
     DeleteFile[] deleteFiles = deletes.forDataFile(startingSequenceNumber, dataFile);
     if(!ignoreEq) {
       validate that if any eq delete exists in deleteFiles
    }
   
     if(!ignorePos) {
       validate that if any pos delete exists in deleteFiles
     }
   }
   ```
   Does this cover the current validation logic ?



##########
core/src/main/java/org/apache/iceberg/SnapshotSummary.java:
##########
@@ -58,6 +58,8 @@ public class SnapshotSummary {
   public static final String SOURCE_SNAPSHOT_ID_PROP = "source-snapshot-id";
   public static final String REPLACE_PARTITIONS_PROP = "replace-partitions";
   public static final String EXTRA_METADATA_PREFIX = "snapshot-property.";
+  public static final String POSITION_DELETES_WITHIN_COMMIT_ONLY =
+      "position-deletes-within-commit-only";

Review Comment:
   How about `position-deletes-only-apply-to-delta` or `position-deletes-only-apply-to-added-data` ? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org