You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by "linyanghao (via GitHub)" <gi...@apache.org> on 2023/03/31 05:35:31 UTC

[GitHub] [iceberg] linyanghao opened a new pull request, #7249: Avoid conflicts between rewrite datafiles and flink CDC writes

linyanghao opened a new pull request, #7249:
URL: https://github.com/apache/iceberg/pull/7249

   When performing a RewriteDataFiles operation, if Iceberg finds new position-delete files that were produced after the starting snapshot of the rewrite, it checks whether these files could potentially contain deletes for the rewritten data files. If they do, then the rewrite operation fails.
   
   Currently, the check is based on the upper and lower bounds of the DELETE_FILE_PATH field of the pos-delete records. However, this approach can produce false positives, causing rewrites to fail even when there are no actual conflicts. As a result, it becomes impossible to rewrite a table when it is being written to using streaming CDC (Change Data Capture).
   
   To address this issue, this PR proposes adding a new snapshot property, "position-deletes-within-commit-only", which will be set to "true" when CDC-writing using Flink. This property will indicate that the new pos-deletes only refer to data files within the same commit, not commits before it. When checking for conflicts during rewrites, we can then skip the commits generated by Flink CDC-writes.
   
   By implementing this change, we can resolve the following issues:
   
   https://github.com/apache/iceberg/issues/4996
   https://github.com/apache/iceberg/issues/5397
   https://github.com/apache/iceberg/issues/6330


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] chenjunjiedada commented on a diff in pull request #7249: Avoid conflicts between rewrite datafiles and flink CDC writes

Posted by "chenjunjiedada (via GitHub)" <gi...@apache.org>.
chenjunjiedada commented on code in PR #7249:
URL: https://github.com/apache/iceberg/pull/7249#discussion_r1156040650


##########
core/src/main/java/org/apache/iceberg/SnapshotSummary.java:
##########
@@ -58,6 +58,8 @@ public class SnapshotSummary {
   public static final String SOURCE_SNAPSHOT_ID_PROP = "source-snapshot-id";
   public static final String REPLACE_PARTITIONS_PROP = "replace-partitions";
   public static final String EXTRA_METADATA_PREFIX = "snapshot-property.";
+  public static final String POSITION_DELETES_WITHIN_COMMIT_ONLY =
+      "position-deletes-within-commit-only";

Review Comment:
   How about `no-pos-delete-apply-to-previous-data`? Or `no-pos-delete-target-previous-data`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] zhongyujiang commented on pull request #7249: Avoid conflicts between rewrite datafiles and flink CDC writes

Posted by "zhongyujiang (via GitHub)" <gi...@apache.org>.
zhongyujiang commented on PR #7249:
URL: https://github.com/apache/iceberg/pull/7249#issuecomment-1493999658

   > This solution is similar to my early PR(https://github.com/apache/iceberg/pull/4748, and https://github.com/apache/iceberg/pull/4703). This way is deemed as dangerous so I change to another way ( https://github.com/apache/iceberg/pull/5760)
   
   I think this is different from 4748 because this doesn't allow users to choose to ignore pos deletes. 
   Snapshots submitted by other engines that contain snapshots that may be applied to historical data files will not be ignored, because there will be no 'position-deletes-within-commit-only'='true' in their snapshot summary. And according to this [comment](https://github.com/apache/iceberg/blob/7184691a60e2ac21dfd6b22fba56d2fe60f9af51/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java#L370), Flink delta txn won't commit pos deletes that can be applied to historical data either. That is to say, there is no risk of ignoring pos deletes can be applied to history data, right?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] zhongyujiang commented on a diff in pull request #7249: Avoid conflicts between rewrite datafiles and flink CDC writes

Posted by "zhongyujiang (via GitHub)" <gi...@apache.org>.
zhongyujiang commented on code in PR #7249:
URL: https://github.com/apache/iceberg/pull/7249#discussion_r1155453340


##########
core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java:
##########
@@ -444,6 +444,12 @@ private void validateNoNewDeletesForDataFiles(
     if (parent == null || base.formatVersion() < 2) {
       return;
     }
+    List<Snapshot> snapshots =
+        Lists.newArrayList(
+            SnapshotUtil.ancestorsBetween(
+                base.currentSnapshot().snapshotId(), startingSnapshotId, base::snapshot));
+    boolean ignorePositionDeletes =
+        snapshots.stream().allMatch(this::cannotContainPosDeletesForPreviousSnapshots);

Review Comment:
   I think `hasNoConflictingPosDeletes` might  be more appropriate name than `ignorePositionDeletes `, because this step is to judge whether the pos deletes in the new snapshots cannot have conflicts.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] chenjunjiedada commented on a diff in pull request #7249: Avoid conflicts between rewrite datafiles and flink CDC writes

Posted by "chenjunjiedada (via GitHub)" <gi...@apache.org>.
chenjunjiedada commented on code in PR #7249:
URL: https://github.com/apache/iceberg/pull/7249#discussion_r1156040650


##########
core/src/main/java/org/apache/iceberg/SnapshotSummary.java:
##########
@@ -58,6 +58,8 @@ public class SnapshotSummary {
   public static final String SOURCE_SNAPSHOT_ID_PROP = "source-snapshot-id";
   public static final String REPLACE_PARTITIONS_PROP = "replace-partitions";
   public static final String EXTRA_METADATA_PREFIX = "snapshot-property.";
+  public static final String POSITION_DELETES_WITHIN_COMMIT_ONLY =
+      "position-deletes-within-commit-only";

Review Comment:
   how about `no-pos-delete-apply-to-previous-data`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] chenjunjiedada commented on pull request #7249: Avoid conflicts between rewrite datafiles and flink CDC writes

Posted by "chenjunjiedada (via GitHub)" <gi...@apache.org>.
chenjunjiedada commented on PR #7249:
URL: https://github.com/apache/iceberg/pull/7249#issuecomment-1494420589

   >Flink delta txn won't commit pos deletes that can be applied to historical data. That is to say, there is no risk of ignoring pos deletes can be applied to history data, right?
   
   There is no guarantee that Flink should write data in that way, Flink community also provides batch APIs that allow us to implement upsert or merge into in Spark MoR way.  But it is safe right now to depend on the current implementation. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] linyanghao commented on pull request #7249: Avoid conflicts between rewrite datafiles and flink CDC writes

Posted by "linyanghao (via GitHub)" <gi...@apache.org>.
linyanghao commented on PR #7249:
URL: https://github.com/apache/iceberg/pull/7249#issuecomment-1495350793

   Hi @zhongyujiang @chenjunjiedada . Thanks for reviewing. I have just made some changes based on your reviews.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] linyanghao commented on a diff in pull request #7249: Avoid conflicts between rewrite datafiles and flink CDC writes

Posted by "linyanghao (via GitHub)" <gi...@apache.org>.
linyanghao commented on code in PR #7249:
URL: https://github.com/apache/iceberg/pull/7249#discussion_r1155911788


##########
core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java:
##########
@@ -444,6 +444,12 @@ private void validateNoNewDeletesForDataFiles(
     if (parent == null || base.formatVersion() < 2) {
       return;
     }
+    List<Snapshot> snapshots =
+        Lists.newArrayList(
+            SnapshotUtil.ancestorsBetween(
+                base.currentSnapshot().snapshotId(), startingSnapshotId, base::snapshot));
+    boolean ignorePositionDeletes =
+        snapshots.stream().allMatch(this::cannotContainPosDeletesForPreviousSnapshots);

Review Comment:
   To avoid this, we can read delete manifests only from snapshots that do not have "position-deletes-within-commit-only"="true". I can work on this change if the PR is to be merged.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] zhongyujiang commented on a diff in pull request #7249: Avoid conflicts between rewrite datafiles and flink CDC writes

Posted by "zhongyujiang (via GitHub)" <gi...@apache.org>.
zhongyujiang commented on code in PR #7249:
URL: https://github.com/apache/iceberg/pull/7249#discussion_r1155423494


##########
core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java:
##########
@@ -444,6 +444,12 @@ private void validateNoNewDeletesForDataFiles(
     if (parent == null || base.formatVersion() < 2) {
       return;
     }
+    List<Snapshot> snapshots =

Review Comment:
   Style: Should have a blank line after a code block.



##########
core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java:
##########
@@ -444,6 +444,12 @@ private void validateNoNewDeletesForDataFiles(
     if (parent == null || base.formatVersion() < 2) {
       return;
     }
+    List<Snapshot> snapshots =
+        Lists.newArrayList(
+            SnapshotUtil.ancestorsBetween(
+                base.currentSnapshot().snapshotId(), startingSnapshotId, base::snapshot));
+    boolean ignorePositionDeletes =
+        snapshots.stream().allMatch(this::cannotContainPosDeletesForPreviousSnapshots);

Review Comment:
   I think `hasNoConflictingPosDeletes` might  be more appropriate, because this step is to judge whether the pos deletes in the new snapshots cannot have conflicts.



##########
core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java:
##########
@@ -453,20 +459,47 @@ private void validateNoNewDeletesForDataFiles(
       // fail
       DeleteFile[] deleteFiles = deletes.forDataFile(startingSequenceNumber, dataFile);
       if (ignoreEqualityDeletes) {
-        ValidationException.check(
-            Arrays.stream(deleteFiles)
-                .noneMatch(deleteFile -> deleteFile.content() == FileContent.POSITION_DELETES),
-            "Cannot commit, found new position delete for replaced data file: %s",
-            dataFile);
+        if (!ignorePositionDeletes) {
+          ValidationException.check(
+              Arrays.stream(deleteFiles)
+                  .noneMatch(deleteFile -> deleteFile.content() == FileContent.POSITION_DELETES),
+              "Cannot commit, found new position delete for replaced data file: %s",
+              dataFile);
+        }
       } else {
-        ValidationException.check(
-            deleteFiles.length == 0,
-            "Cannot commit, found new delete for replaced data file: %s",
-            dataFile);
+        if (ignorePositionDeletes) {
+          ValidationException.check(
+              Arrays.stream(deleteFiles)
+                  .noneMatch(deleteFile -> deleteFile.content() == FileContent.EQUALITY_DELETES),
+              "Cannot commit, found new equality delete for replaced data file: %s",
+              dataFile);
+        } else {
+          ValidationException.check(
+              deleteFiles.length == 0,
+              "Cannot commit, found new delete for replaced data file: %s",
+              dataFile);
+        }
       }
     }
   }
 
+  private boolean cannotContainPosDeletesForPreviousSnapshots(Snapshot snapshot) {
+    switch (snapshot.operation()) {
+      case DataOperations.APPEND:
+      case DataOperations.REPLACE:
+        return true;
+      case DataOperations.DELETE:
+      case DataOperations.OVERWRITE:
+        return Boolean.parseBoolean(
+            snapshot
+                .summary()
+                .getOrDefault(SnapshotSummary.POSITION_DELETES_WITHIN_COMMIT_ONLY, "false"));
+      default:
+        throw new RuntimeException(
+            String.format("Unknown data operation: %s", snapshot.operation()));

Review Comment:
   Nit: For this method, I think we can directly return false instead of throwing an exception for those unknow data operations.



##########
core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java:
##########
@@ -453,20 +459,47 @@ private void validateNoNewDeletesForDataFiles(
       // fail
       DeleteFile[] deleteFiles = deletes.forDataFile(startingSequenceNumber, dataFile);
       if (ignoreEqualityDeletes) {
-        ValidationException.check(
-            Arrays.stream(deleteFiles)
-                .noneMatch(deleteFile -> deleteFile.content() == FileContent.POSITION_DELETES),
-            "Cannot commit, found new position delete for replaced data file: %s",
-            dataFile);
+        if (!ignorePositionDeletes) {
+          ValidationException.check(
+              Arrays.stream(deleteFiles)
+                  .noneMatch(deleteFile -> deleteFile.content() == FileContent.POSITION_DELETES),
+              "Cannot commit, found new position delete for replaced data file: %s",
+              dataFile);
+        }
       } else {
-        ValidationException.check(
-            deleteFiles.length == 0,
-            "Cannot commit, found new delete for replaced data file: %s",
-            dataFile);
+        if (ignorePositionDeletes) {
+          ValidationException.check(
+              Arrays.stream(deleteFiles)
+                  .noneMatch(deleteFile -> deleteFile.content() == FileContent.EQUALITY_DELETES),
+              "Cannot commit, found new equality delete for replaced data file: %s",
+              dataFile);
+        } else {
+          ValidationException.check(
+              deleteFiles.length == 0,
+              "Cannot commit, found new delete for replaced data file: %s",
+              dataFile);
+        }
       }
     }
   }
 
+  private boolean cannotContainPosDeletesForPreviousSnapshots(Snapshot snapshot) {
+    switch (snapshot.operation()) {
+      case DataOperations.APPEND:
+      case DataOperations.REPLACE:
+        return true;
+      case DataOperations.DELETE:

Review Comment:
   I think [`DELETE`](https://github.com/apache/iceberg/blob/c07f2aabc0a1d02f068ecf1514d2479c0fbdd3b0/api/src/main/java/org/apache/iceberg/DataOperations.java#L55) snapshot cannot contain pos deletes either.



##########
core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java:
##########
@@ -453,20 +459,47 @@ private void validateNoNewDeletesForDataFiles(
       // fail
       DeleteFile[] deleteFiles = deletes.forDataFile(startingSequenceNumber, dataFile);
       if (ignoreEqualityDeletes) {
-        ValidationException.check(
-            Arrays.stream(deleteFiles)
-                .noneMatch(deleteFile -> deleteFile.content() == FileContent.POSITION_DELETES),
-            "Cannot commit, found new position delete for replaced data file: %s",
-            dataFile);
+        if (!ignorePositionDeletes) {
+          ValidationException.check(
+              Arrays.stream(deleteFiles)
+                  .noneMatch(deleteFile -> deleteFile.content() == FileContent.POSITION_DELETES),
+              "Cannot commit, found new position delete for replaced data file: %s",
+              dataFile);
+        }
       } else {
-        ValidationException.check(
-            deleteFiles.length == 0,
-            "Cannot commit, found new delete for replaced data file: %s",
-            dataFile);
+        if (ignorePositionDeletes) {
+          ValidationException.check(
+              Arrays.stream(deleteFiles)
+                  .noneMatch(deleteFile -> deleteFile.content() == FileContent.EQUALITY_DELETES),
+              "Cannot commit, found new equality delete for replaced data file: %s",
+              dataFile);
+        } else {
+          ValidationException.check(
+              deleteFiles.length == 0,
+              "Cannot commit, found new delete for replaced data file: %s",
+              dataFile);
+        }

Review Comment:
   Seems like we can rewrite the logic of L454-L481  to be like this:
   ```
   if(ignoreEq && ignorePos) {
    return;
   }
   
   ...
   
   for (DataFile dataFile : dataFiles) {
     DeleteFile[] deleteFiles = deletes.forDataFile(startingSequenceNumber, dataFile);
     if(!ignoreEq) {
       validate that if any eq delete exists in deleteFiles
    }
   
     if(!ignorePos) {
       validate that if any pos delete exists in deleteFiles
     }
   }
   ```
   Does this cover the current validation logic ?



##########
core/src/main/java/org/apache/iceberg/SnapshotSummary.java:
##########
@@ -58,6 +58,8 @@ public class SnapshotSummary {
   public static final String SOURCE_SNAPSHOT_ID_PROP = "source-snapshot-id";
   public static final String REPLACE_PARTITIONS_PROP = "replace-partitions";
   public static final String EXTRA_METADATA_PREFIX = "snapshot-property.";
+  public static final String POSITION_DELETES_WITHIN_COMMIT_ONLY =
+      "position-deletes-within-commit-only";

Review Comment:
   How about `position-deletes-only-apply-to-delta` or `position-deletes-only-apply-to-added-data` ? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] linyanghao commented on a diff in pull request #7249: Avoid conflicts between rewrite datafiles and flink CDC writes

Posted by "linyanghao (via GitHub)" <gi...@apache.org>.
linyanghao commented on code in PR #7249:
URL: https://github.com/apache/iceberg/pull/7249#discussion_r1155546151


##########
core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java:
##########
@@ -453,20 +459,47 @@ private void validateNoNewDeletesForDataFiles(
       // fail
       DeleteFile[] deleteFiles = deletes.forDataFile(startingSequenceNumber, dataFile);
       if (ignoreEqualityDeletes) {
-        ValidationException.check(
-            Arrays.stream(deleteFiles)
-                .noneMatch(deleteFile -> deleteFile.content() == FileContent.POSITION_DELETES),
-            "Cannot commit, found new position delete for replaced data file: %s",
-            dataFile);
+        if (!ignorePositionDeletes) {
+          ValidationException.check(
+              Arrays.stream(deleteFiles)
+                  .noneMatch(deleteFile -> deleteFile.content() == FileContent.POSITION_DELETES),
+              "Cannot commit, found new position delete for replaced data file: %s",
+              dataFile);
+        }
       } else {
-        ValidationException.check(
-            deleteFiles.length == 0,
-            "Cannot commit, found new delete for replaced data file: %s",
-            dataFile);
+        if (ignorePositionDeletes) {
+          ValidationException.check(
+              Arrays.stream(deleteFiles)
+                  .noneMatch(deleteFile -> deleteFile.content() == FileContent.EQUALITY_DELETES),
+              "Cannot commit, found new equality delete for replaced data file: %s",
+              dataFile);
+        } else {
+          ValidationException.check(
+              deleteFiles.length == 0,
+              "Cannot commit, found new delete for replaced data file: %s",
+              dataFile);
+        }

Review Comment:
   That's a good point!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] chenjunjiedada commented on pull request #7249: Avoid conflicts between rewrite datafiles and flink CDC writes

Posted by "chenjunjiedada (via GitHub)" <gi...@apache.org>.
chenjunjiedada commented on PR #7249:
URL: https://github.com/apache/iceberg/pull/7249#issuecomment-1493790225

   + @aokolnychyi @flyrain You guys may have interests as well. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] chenjunjiedada commented on pull request #7249: Avoid conflicts between rewrite datafiles and flink CDC writes

Posted by "chenjunjiedada (via GitHub)" <gi...@apache.org>.
chenjunjiedada commented on PR #7249:
URL: https://github.com/apache/iceberg/pull/7249#issuecomment-1493765910

   This solution is similar to my early PR(https://github.com/apache/iceberg/pull/4748, and https://github.com/apache/iceberg/pull/4703). This way is deemed as dangerous so I change to another way ( https://github.com/apache/iceberg/pull/5760)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] chenjunjiedada commented on a diff in pull request #7249: Avoid conflicts between rewrite datafiles and flink CDC writes

Posted by "chenjunjiedada (via GitHub)" <gi...@apache.org>.
chenjunjiedada commented on code in PR #7249:
URL: https://github.com/apache/iceberg/pull/7249#discussion_r1156040650


##########
core/src/main/java/org/apache/iceberg/SnapshotSummary.java:
##########
@@ -58,6 +58,8 @@ public class SnapshotSummary {
   public static final String SOURCE_SNAPSHOT_ID_PROP = "source-snapshot-id";
   public static final String REPLACE_PARTITIONS_PROP = "replace-partitions";
   public static final String EXTRA_METADATA_PREFIX = "snapshot-property.";
+  public static final String POSITION_DELETES_WITHIN_COMMIT_ONLY =
+      "position-deletes-within-commit-only";

Review Comment:
   how about `no-pos-delete-apply-to-previous-data`? `no-pos-delete-target-previous-data`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] zhongyujiang commented on a diff in pull request #7249: Avoid conflicts between rewrite datafiles and flink CDC writes

Posted by "zhongyujiang (via GitHub)" <gi...@apache.org>.
zhongyujiang commented on code in PR #7249:
URL: https://github.com/apache/iceberg/pull/7249#discussion_r1155847420


##########
core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java:
##########
@@ -444,6 +444,12 @@ private void validateNoNewDeletesForDataFiles(
     if (parent == null || base.formatVersion() < 2) {
       return;
     }
+    List<Snapshot> snapshots =
+        Lists.newArrayList(
+            SnapshotUtil.ancestorsBetween(
+                base.currentSnapshot().snapshotId(), startingSnapshotId, base::snapshot));
+    boolean ignorePositionDeletes =
+        snapshots.stream().allMatch(this::cannotContainPosDeletesForPreviousSnapshots);

Review Comment:
   Another issue came to my mind is that when there are overwrite operation snapshots committed by other engines, this will return `false` and the next verification step will still validate the pos deletes committed by Flink.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org