You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2021/09/12 14:05:14 UTC

[GitHub] [iceberg] Reo-LEI opened a new pull request #3103: Flink: IcebergFilesCommitter validate dataFile exist start from last committed snapshot.

Reo-LEI opened a new pull request #3103:
URL: https://github.com/apache/iceberg/pull/3103


   This PR is trying to address the issue: https://github.com/apache/iceberg/issues/3102
   
   Now, `IcebergFilesCommitter` will only travel all snapshot history in committer start, and then will keep check snapshot history between last committed snapshot id and current snapshot id.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] Reo-LEI commented on a change in pull request #3103: Flink: IcebergFilesCommitter validate dataFile exist start from last committed snapshot.

Posted by GitBox <gi...@apache.org>.
Reo-LEI commented on a change in pull request #3103:
URL: https://github.com/apache/iceberg/pull/3103#discussion_r718347222



##########
File path: flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
##########
@@ -283,6 +287,7 @@ private void commitDeltaTxn(NavigableMap<Long, WriteResult> pendingResults, Stri
         // merged one will lead to the incorrect delete semantic.
         WriteResult result = e.getValue();
         RowDelta rowDelta = table.newRowDelta()
+            .validateFromSnapshot(lastCommittedSnapshotId)

Review comment:
       > but with your PR we can run the snapshot expire task with the flink job running since you are updating the lastCommittedSnapshotId.
   
   I think the snapshot expire task don’t cause the validation error is becasue the `validationHistory` will just travel to the `lastCommittedSnapshot` and stop valid the removed snapshots which is older than the `lastCommittedSnapshot`. But once the `lastCommittedSnapshot` be removed(e.g. the snapshot expire task run multiple time between two checkpoints) , you will encounter the validation error again.
   
   I think store the `lastCommittedSnapshotId` is not a correct way to resolve the validation error(#2482). Because we cann't guarantee the `lastCommittedSnapshot` what we store its snapshot id will alway exists when we restore the flink job. The `lastCommittedSnapshot` probably has been removed when we restore the flink job, and the validation error will raise again. @ayush-san




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] ayush-san commented on a change in pull request #3103: Flink: IcebergFilesCommitter validate dataFile exist start from last committed snapshot.

Posted by GitBox <gi...@apache.org>.
ayush-san commented on a change in pull request #3103:
URL: https://github.com/apache/iceberg/pull/3103#discussion_r718147313



##########
File path: flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
##########
@@ -283,6 +287,7 @@ private void commitDeltaTxn(NavigableMap<Long, WriteResult> pendingResults, Stri
         // merged one will lead to the incorrect delete semantic.
         WriteResult result = e.getValue();
         RowDelta rowDelta = table.newRowDelta()
+            .validateFromSnapshot(lastCommittedSnapshotId)

Review comment:
       @Reo-LEI Can you please help me understand how the #2867 help in solving this [validation error](https://github.com/apache/iceberg/blob/master/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java#L435) 
   
   I agree with you that we can handle the `lastCommittedSnapshotId` in a separate PR and get this reviewed because this will really speed up the commit time which increases with time. I have seen my flink job checkpoint time increases to 10-15mins from 100-200ms 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] ayush-san commented on a change in pull request #3103: Flink: IcebergFilesCommitter validate dataFile exist start from last committed snapshot.

Posted by GitBox <gi...@apache.org>.
ayush-san commented on a change in pull request #3103:
URL: https://github.com/apache/iceberg/pull/3103#discussion_r713615385



##########
File path: flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
##########
@@ -283,6 +287,7 @@ private void commitDeltaTxn(NavigableMap<Long, WriteResult> pendingResults, Stri
         // merged one will lead to the incorrect delete semantic.
         WriteResult result = e.getValue();
         RowDelta rowDelta = table.newRowDelta()
+            .validateFromSnapshot(lastCommittedSnapshotId)

Review comment:
       What will be the value of `lastCommittedSnapshotId` if I restore the job from a savepoint? 
   
   Shouldn't we do something similar for it as we do for `maxCommittedCheckpointId` but here many things to consider as if we run expire snapshot maintenance procedure before restoring the job from the savepoint(https://github.com/apache/iceberg/issues/2482#issuecomment-897017924) 
   
   ```
   this.lastCommittedSnapshotId = getLastCommittedSnapshotId(table);
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] ayush-san commented on a change in pull request #3103: Flink: IcebergFilesCommitter validate dataFile exist start from last committed snapshot.

Posted by GitBox <gi...@apache.org>.
ayush-san commented on a change in pull request #3103:
URL: https://github.com/apache/iceberg/pull/3103#discussion_r718268095



##########
File path: flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
##########
@@ -283,6 +287,7 @@ private void commitDeltaTxn(NavigableMap<Long, WriteResult> pendingResults, Stri
         // merged one will lead to the incorrect delete semantic.
         WriteResult result = e.getValue();
         RowDelta rowDelta = table.newRowDelta()
+            .validateFromSnapshot(lastCommittedSnapshotId)

Review comment:
       Yes, but with your PR we can run the snapshot expire task with the flink job running since you are updating the lastCommittedSnapshotId. The only case now left is that when we start the flink job from a checkpoint, we will encounter the same problem. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] ayush-san commented on a change in pull request #3103: Flink: IcebergFilesCommitter validate dataFile exist start from last committed snapshot.

Posted by GitBox <gi...@apache.org>.
ayush-san commented on a change in pull request #3103:
URL: https://github.com/apache/iceberg/pull/3103#discussion_r718268095



##########
File path: flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
##########
@@ -283,6 +287,7 @@ private void commitDeltaTxn(NavigableMap<Long, WriteResult> pendingResults, Stri
         // merged one will lead to the incorrect delete semantic.
         WriteResult result = e.getValue();
         RowDelta rowDelta = table.newRowDelta()
+            .validateFromSnapshot(lastCommittedSnapshotId)

Review comment:
       Yes, but with your PR we can run the snapshot expire task with the flink job running since you are updating the lastCommittedSnapshotId. The only case now left is that when we start the flink job from a checkpoint, we will encounter the same problem. 
   
   But if are doing that for one case, we can mimic the same for case when we restore the job from a checkpoint. Anyways we can tackle this in a separate PR and discuss it with @rdblue and @openinx 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] Reo-LEI commented on a change in pull request #3103: Flink: IcebergFilesCommitter validate dataFile exist start from last committed snapshot.

Posted by GitBox <gi...@apache.org>.
Reo-LEI commented on a change in pull request #3103:
URL: https://github.com/apache/iceberg/pull/3103#discussion_r718183168



##########
File path: flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
##########
@@ -283,6 +287,7 @@ private void commitDeltaTxn(NavigableMap<Long, WriteResult> pendingResults, Stri
         // merged one will lead to the incorrect delete semantic.
         WriteResult result = e.getValue();
         RowDelta rowDelta = table.newRowDelta()
+            .validateFromSnapshot(lastCommittedSnapshotId)

Review comment:
       @ayush-san  I'm very sorry, this is my mistake, I want to linked to https://github.com/apache/iceberg/issues/3102#issuecomment-919073751, but not #2867 .




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] Reo-LEI commented on pull request #3103: Flink: IcebergFilesCommitter validate dataFile exist start from last committed snapshot.

Posted by GitBox <gi...@apache.org>.
Reo-LEI commented on pull request #3103:
URL: https://github.com/apache/iceberg/pull/3103#issuecomment-946719127


   Close for #3258 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] Reo-LEI commented on a change in pull request #3103: Flink: IcebergFilesCommitter validate dataFile exist start from last committed snapshot.

Posted by GitBox <gi...@apache.org>.
Reo-LEI commented on a change in pull request #3103:
URL: https://github.com/apache/iceberg/pull/3103#discussion_r718183168



##########
File path: flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
##########
@@ -283,6 +287,7 @@ private void commitDeltaTxn(NavigableMap<Long, WriteResult> pendingResults, Stri
         // merged one will lead to the incorrect delete semantic.
         WriteResult result = e.getValue();
         RowDelta rowDelta = table.newRowDelta()
+            .validateFromSnapshot(lastCommittedSnapshotId)

Review comment:
       @ayush-san  I'm very sorry, this is my mistake, I want to link to https://github.com/apache/iceberg/issues/3102#issuecomment-919073751, but not #2867 .
   
   And I think this PR will not solving https://github.com/apache/iceberg/issues/2482. if the validation error not fixed, this PR will encounter the same problem. I think we should follow https://github.com/apache/iceberg/pull/2603#issuecomment-861831900 this to check the exists files to fix the validation error.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] Reo-LEI commented on a change in pull request #3103: Flink: IcebergFilesCommitter validate dataFile exist start from last committed snapshot.

Posted by GitBox <gi...@apache.org>.
Reo-LEI commented on a change in pull request #3103:
URL: https://github.com/apache/iceberg/pull/3103#discussion_r718183168



##########
File path: flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
##########
@@ -283,6 +287,7 @@ private void commitDeltaTxn(NavigableMap<Long, WriteResult> pendingResults, Stri
         // merged one will lead to the incorrect delete semantic.
         WriteResult result = e.getValue();
         RowDelta rowDelta = table.newRowDelta()
+            .validateFromSnapshot(lastCommittedSnapshotId)

Review comment:
       @ayush-san  I'm very sorry, this is my mistake, I want to linked to https://github.com/apache/iceberg/issues/3102#issuecomment-919073751, but not #2867 .

##########
File path: flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
##########
@@ -283,6 +287,7 @@ private void commitDeltaTxn(NavigableMap<Long, WriteResult> pendingResults, Stri
         // merged one will lead to the incorrect delete semantic.
         WriteResult result = e.getValue();
         RowDelta rowDelta = table.newRowDelta()
+            .validateFromSnapshot(lastCommittedSnapshotId)

Review comment:
       @ayush-san  I'm very sorry, this is my mistake, I want to link to https://github.com/apache/iceberg/issues/3102#issuecomment-919073751, but not #2867 .

##########
File path: flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
##########
@@ -283,6 +287,7 @@ private void commitDeltaTxn(NavigableMap<Long, WriteResult> pendingResults, Stri
         // merged one will lead to the incorrect delete semantic.
         WriteResult result = e.getValue();
         RowDelta rowDelta = table.newRowDelta()
+            .validateFromSnapshot(lastCommittedSnapshotId)

Review comment:
       @ayush-san  I'm very sorry, this is my mistake, I want to link to https://github.com/apache/iceberg/issues/3102#issuecomment-919073751, but not #2867 .
   
   And I think this PR will not solving https://github.com/apache/iceberg/issues/2482. if the validation error not fixed, this PR will encounter the same problem. I think we should follow https://github.com/apache/iceberg/pull/2603#issuecomment-861831900 this to check the exists files to fix the validation error.

##########
File path: flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
##########
@@ -283,6 +287,7 @@ private void commitDeltaTxn(NavigableMap<Long, WriteResult> pendingResults, Stri
         // merged one will lead to the incorrect delete semantic.
         WriteResult result = e.getValue();
         RowDelta rowDelta = table.newRowDelta()
+            .validateFromSnapshot(lastCommittedSnapshotId)

Review comment:
       > but with your PR we can run the snapshot expire task with the flink job running since you are updating the lastCommittedSnapshotId.
   
   I think the snapshot expire task don’t cause the validation error is becasue the `validationHistory` will just travel to the `lastCommittedSnapshot` and stop valid the removed snapshots which is older than the `lastCommittedSnapshot`. But once the `lastCommittedSnapshot` be removed(e.g. the snapshot expire task run multiple time between two checkpoints) , you will encounter the validation error again.
   
   I think store the `lastCommittedSnapshotId` is not a correct way to resolve the validation error(#2482). Because we cann't guarantee the `lastCommittedSnapshot` what we store its snapshot id will alway exists when we restore the flink job. The `lastCommittedSnapshot` probably has been removed when we restore the flink job, and the validation error will raise again. @ayush-san




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] Reo-LEI commented on pull request #3103: Flink: IcebergFilesCommitter validate dataFile exist start from last committed snapshot.

Posted by GitBox <gi...@apache.org>.
Reo-LEI commented on pull request #3103:
URL: https://github.com/apache/iceberg/pull/3103#issuecomment-917647361


   @openinx @stevenzwu @kbendick could you help to review this PR? :)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] Reo-LEI closed pull request #3103: Flink: IcebergFilesCommitter validate dataFile exist start from last committed snapshot.

Posted by GitBox <gi...@apache.org>.
Reo-LEI closed pull request #3103:
URL: https://github.com/apache/iceberg/pull/3103


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] Reo-LEI commented on a change in pull request #3103: Flink: IcebergFilesCommitter validate dataFile exist start from last committed snapshot.

Posted by GitBox <gi...@apache.org>.
Reo-LEI commented on a change in pull request #3103:
URL: https://github.com/apache/iceberg/pull/3103#discussion_r716338791



##########
File path: flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
##########
@@ -283,6 +287,7 @@ private void commitDeltaTxn(NavigableMap<Long, WriteResult> pendingResults, Stri
         // merged one will lead to the incorrect delete semantic.
         WriteResult result = e.getValue();
         RowDelta rowDelta = table.newRowDelta()
+            .validateFromSnapshot(lastCommittedSnapshotId)

Review comment:
       As I listed in https://github.com/apache/iceberg/pull/2867, restart or restore from checkpoint/savepoint, the `lastCommittedSnapshotId` will just simply reset to null, and `rowDelta` will travel all snapshot history to ensure data files still exist and guarantee all snapshot history are valid. If we delete the expired snapshots by maintenance procedure before restoring the job, the validation will travel to the snapshot which is commited by expire snapshot maintenance procedure. I think that whould be ok.
   
   This optimization just trying to speed up commit result in runtime, and I want to keep it simple, so I think we don't need to save the `lastCommittedSnapshotId` and restore it from checkpoint/savepoint. but I want to hear @openinx @jackye1995 @stevenzwu @kbendick opinions.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] Reo-LEI commented on a change in pull request #3103: Flink: IcebergFilesCommitter validate dataFile exist start from last committed snapshot.

Posted by GitBox <gi...@apache.org>.
Reo-LEI commented on a change in pull request #3103:
URL: https://github.com/apache/iceberg/pull/3103#discussion_r716338791



##########
File path: flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
##########
@@ -283,6 +287,7 @@ private void commitDeltaTxn(NavigableMap<Long, WriteResult> pendingResults, Stri
         // merged one will lead to the incorrect delete semantic.
         WriteResult result = e.getValue();
         RowDelta rowDelta = table.newRowDelta()
+            .validateFromSnapshot(lastCommittedSnapshotId)

Review comment:
       Thanks for your review @ayush-san. As I listed in https://github.com/apache/iceberg/pull/2867, restart or restore from checkpoint/savepoint, the `lastCommittedSnapshotId` will just simply reset to null, and `rowDelta` will travel all snapshot history to ensure data files still exist and guarantee all snapshot history are valid. If we delete the expired snapshots by maintenance procedure before restoring the job, the validation will travel to the snapshot which is commited by expire snapshot maintenance procedure. I think that whould be ok.
   
   This optimization just trying to speed up commit result in runtime, and I want to keep it simple, so I think we don't need to save the `lastCommittedSnapshotId` and restore it from checkpoint/savepoint. but I want to hear @openinx @jackye1995 @stevenzwu @kbendick opinions.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] ayush-san commented on a change in pull request #3103: Flink: IcebergFilesCommitter validate dataFile exist start from last committed snapshot.

Posted by GitBox <gi...@apache.org>.
ayush-san commented on a change in pull request #3103:
URL: https://github.com/apache/iceberg/pull/3103#discussion_r718147313



##########
File path: flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
##########
@@ -283,6 +287,7 @@ private void commitDeltaTxn(NavigableMap<Long, WriteResult> pendingResults, Stri
         // merged one will lead to the incorrect delete semantic.
         WriteResult result = e.getValue();
         RowDelta rowDelta = table.newRowDelta()
+            .validateFromSnapshot(lastCommittedSnapshotId)

Review comment:
       @Reo-LEI Can you please help me understand how the #2867 help in solving this [validation error](https://github.com/apache/iceberg/blob/master/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java#L435) 
   
   I agree with you that we can handle the `lastCommittedSnapshotId` in a separate PR and get this reviewed because this will really speed up the commit time which increases with time. I have seen my flink job checkpoint time increases to 10-15mins from 100-200ms 

##########
File path: flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
##########
@@ -283,6 +287,7 @@ private void commitDeltaTxn(NavigableMap<Long, WriteResult> pendingResults, Stri
         // merged one will lead to the incorrect delete semantic.
         WriteResult result = e.getValue();
         RowDelta rowDelta = table.newRowDelta()
+            .validateFromSnapshot(lastCommittedSnapshotId)

Review comment:
       Yes, but with your PR we can run the snapshot expire task with the flink job running since you are updating the lastCommittedSnapshotId. The only case now left is that when we start the flink job from a checkpoint, we will encounter the same problem. 

##########
File path: flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
##########
@@ -283,6 +287,7 @@ private void commitDeltaTxn(NavigableMap<Long, WriteResult> pendingResults, Stri
         // merged one will lead to the incorrect delete semantic.
         WriteResult result = e.getValue();
         RowDelta rowDelta = table.newRowDelta()
+            .validateFromSnapshot(lastCommittedSnapshotId)

Review comment:
       Yes, but with your PR we can run the snapshot expire task with the flink job running since you are updating the lastCommittedSnapshotId. The only case now left is that when we start the flink job from a checkpoint, we will encounter the same problem. 
   
   But if are doing that for one case, we can mimic the same for case when we restore the job from a checkpoint. Anyways we can tackle this in a separate PR and discuss it with @rdblue and @openinx 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] Reo-LEI commented on a change in pull request #3103: Flink: IcebergFilesCommitter validate dataFile exist start from last committed snapshot.

Posted by GitBox <gi...@apache.org>.
Reo-LEI commented on a change in pull request #3103:
URL: https://github.com/apache/iceberg/pull/3103#discussion_r718183168



##########
File path: flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
##########
@@ -283,6 +287,7 @@ private void commitDeltaTxn(NavigableMap<Long, WriteResult> pendingResults, Stri
         // merged one will lead to the incorrect delete semantic.
         WriteResult result = e.getValue();
         RowDelta rowDelta = table.newRowDelta()
+            .validateFromSnapshot(lastCommittedSnapshotId)

Review comment:
       @ayush-san  I'm very sorry, this is my mistake, I want to link to https://github.com/apache/iceberg/issues/3102#issuecomment-919073751, but not #2867 .




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org