You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2023/01/20 10:11:15 UTC

[GitHub] [iceberg] cgpoh opened a new issue, #6630: Purpose of MAX_CONTINUOUS_EMPTY_COMMITS in IcebergFilesCommitter

cgpoh opened a new issue, #6630:
URL: https://github.com/apache/iceberg/issues/6630

   ### Query engine
   
   Flink
   
   ### Question
   
   I have a Flink job that uses side output to write to Iceberg table when there are errors in the main processing function. If there are no errors in the processing function, no data files will be added to be committed. I noticed that the Flink job is restarting and throwing the following exception:
   ```
   IcebergFilesCommitter -> Sink: iceberg-error-sink-FPL (1/1)#152 (2b5a4587aa3a50c531671a32a2f1538c) switched from RUNNING to FAILED with failure cause: java.lang.IllegalStateException: Cannot determine partition spec: no data files have been added
   	at org.apache.iceberg.relocated.com.google.common.base.Preconditions.checkState(Preconditions.java:502)
   	at org.apache.iceberg.MergingSnapshotProducer.dataSpec(MergingSnapshotProducer.java:150)
   	at org.apache.iceberg.BaseReplacePartitions.apply(BaseReplacePartitions.java:133)
   	at org.apache.iceberg.SnapshotProducer.apply(SnapshotProducer.java:223)
   	at org.apache.iceberg.BaseReplacePartitions.apply(BaseReplacePartitions.java:26)
   	at org.apache.iceberg.SnapshotProducer.lambda$commit$2(SnapshotProducer.java:369)
   	at org.apache.iceberg.util.Tasks$Builder.runTaskWithRetry(Tasks.java:402)
   	at org.apache.iceberg.util.Tasks$Builder.runSingleThreaded(Tasks.java:212)
   	at org.apache.iceberg.util.Tasks$Builder.run(Tasks.java:196)
   	at org.apache.iceberg.util.Tasks$Builder.run(Tasks.java:189)
   	at org.apache.iceberg.SnapshotProducer.commit(SnapshotProducer.java:367)
   	at org.apache.iceberg.BaseReplacePartitions.commit(BaseReplacePartitions.java:26)
   	at org.apache.iceberg.flink.sink.IcebergFilesCommitter.commitOperation(IcebergFilesCommitter.java:372)
   	at org.apache.iceberg.flink.sink.IcebergFilesCommitter.replacePartitions(IcebergFilesCommitter.java:314)
   	at org.apache.iceberg.flink.sink.IcebergFilesCommitter.commitPendingResult(IcebergFilesCommitter.java:270)
   	at org.apache.iceberg.flink.sink.IcebergFilesCommitter.commitUpToCheckpoint(IcebergFilesCommitter.java:255)
   	at org.apache.iceberg.flink.sink.IcebergFilesCommitter.notifyCheckpointComplete(IcebergFilesCommitter.java:229)
   	at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.notifyCheckpointComplete(StreamOperatorWrapper.java:104)
   	at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.notifyCheckpointComplete(RegularOperatorChain.java:145)
   	at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.notifyCheckpoint(SubtaskCheckpointCoordinatorImpl.java:409)
   	at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.notifyCheckpointComplete(SubtaskCheckpointCoordinatorImpl.java:343)
   	at org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:1384)
   	at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$notifyCheckpointCompleteAsync$14(StreamTask.java:1325)
   	at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$notifyCheckpointOperation$17(StreamTask.java:1364)
   	at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
   	at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
   	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:338)
   	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:324)
   	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201)
   	at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804)
   	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753)
   	at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
   	at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
   	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
   	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
   	at java.base/java.lang.Thread.run(Unknown Source)
   ```
   I saw that in the `commitPendingResult` function of IcebergFilesCommitter.java, there's a condition to check whether to skip empty commit but if the MAX_CONTINUOUS_EMPTY_COMMITS is met, it will proceed to commit even there are no data files to commit and thus, throwing the above exception.
   ```java
   long totalFiles = summary.dataFilesCount() + summary.deleteFilesCount();
   continuousEmptyCheckpoints = totalFiles == 0 ? continuousEmptyCheckpoints + 1 : 0;
   if (totalFiles != 0 || continuousEmptyCheckpoints % maxContinuousEmptyCommits == 0) {
     if (replacePartitions) {
       replacePartitions(pendingResults, summary, newFlinkJobId, operatorId, checkpointId);
     } else {
       commitDeltaTxn(pendingResults, summary, newFlinkJobId, operatorId, checkpointId);
     }
     continuousEmptyCheckpoints = 0;
   } else {
     LOG.info("Skipping committing empty checkpoint {}", checkpointId);
   }
   ```
   May I know what's the purpose of this empty commit?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] pvary commented on issue #6630: Purpose of MAX_CONTINUOUS_EMPTY_COMMITS in IcebergFilesCommitter

Posted by "pvary (via GitHub)" <gi...@apache.org>.
pvary commented on issue #6630:
URL: https://github.com/apache/iceberg/issues/6630#issuecomment-1610804650

   The Flink snapshot/checpoint state is kept in 3 places:
   - Flink internal state - list of temp manifest files
   - File system - temp manifest files and uncomitted data files
   - Iceberg table data/metadata
   
   These 3 needs to be in sync, and we need to keep the changes since the last sync. So if we do not commit to Iceberg, then the Flink internal state and the file system temp tables are keep growing.  To avoid this, we commit from time to time (and write Flink metadata to the Iceberg table in the process), and after this commit we are able to remove old temp files and clean some data from the Flink state.
   
   So writing empty commit is intentional/needed, but the failure seems like a bug.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] dou-dou commented on issue #6630: Purpose of MAX_CONTINUOUS_EMPTY_COMMITS in IcebergFilesCommitter

Posted by "dou-dou (via GitHub)" <gi...@apache.org>.
dou-dou commented on issue #6630:
URL: https://github.com/apache/iceberg/issues/6630#issuecomment-1610854411

   @pvary Thanks for your reply.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] cgpoh commented on issue #6630: Purpose of MAX_CONTINUOUS_EMPTY_COMMITS in IcebergFilesCommitter

Posted by "cgpoh (via GitHub)" <gi...@apache.org>.
cgpoh commented on issue #6630:
URL: https://github.com/apache/iceberg/issues/6630#issuecomment-1621213494

   > @pvary I have found that the exception is caused by my unreasonable configuration.Thank you very much for your help.
   
   Hi @dou-dou , what’s your unreasonable configuration? Like to check whether my configuration is unreasonable too.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] hililiwei commented on issue #6630: Purpose of MAX_CONTINUOUS_EMPTY_COMMITS in IcebergFilesCommitter

Posted by "hililiwei (via GitHub)" <gi...@apache.org>.
hililiwei commented on issue #6630:
URL: https://github.com/apache/iceberg/issues/6630#issuecomment-1621091964

   Yes, if we were not supposed to use `overwriter` in streaming tasks, then do we add a check to avoid this misuse? @pvary 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] dou-dou commented on issue #6630: Purpose of MAX_CONTINUOUS_EMPTY_COMMITS in IcebergFilesCommitter

Posted by "dou-dou (via GitHub)" <gi...@apache.org>.
dou-dou commented on issue #6630:
URL: https://github.com/apache/iceberg/issues/6630#issuecomment-1610634508

   I also encountered this  exception,did you find the reason? I also want to get Purpose of MAX_CONTINUOUS_EMPTY_COMMITS .


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] dou-dou commented on issue #6630: Purpose of MAX_CONTINUOUS_EMPTY_COMMITS in IcebergFilesCommitter

Posted by "dou-dou (via GitHub)" <gi...@apache.org>.
dou-dou commented on issue #6630:
URL: https://github.com/apache/iceberg/issues/6630#issuecomment-1612504744

   @pvary 
   **Version**
   flink 1.12.7
   iceberg 0.13.1
   **Question**
   I am using [chunjun ](https://github.com/DTStack/chunjun)to incrementally sync data from mysql to iceberg. I noticed that the Flink job is running. However,when there is no incremental data, that is  **totalFiles == 0**, and  the number of checkpoint submissions is a multiple of MAX_CONTINUOUS_EMPTY_COMMITS, that is **continuousEmptyCheckpoints % maxContinuousEmptyCommits == 0** ,it throws the following exception:
   ![image](https://github.com/apache/iceberg/assets/16593243/af4d38e6-4775-4731-88d5-945164dc6d70)
   then, the Flink job is restarting and running.Next ,it will throwing the exception when checkpointId is  21(The last exception thrown when checkpointId is  12).
   I hope that there is **totalFiles == 0** , and no exception will be generated when executing operation.commit().


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] dou-dou commented on issue #6630: Purpose of MAX_CONTINUOUS_EMPTY_COMMITS in IcebergFilesCommitter

Posted by "dou-dou (via GitHub)" <gi...@apache.org>.
dou-dou commented on issue #6630:
URL: https://github.com/apache/iceberg/issues/6630#issuecomment-1617431974

   @pvary Thanks for your advice.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] cgpoh commented on issue #6630: Purpose of MAX_CONTINUOUS_EMPTY_COMMITS in IcebergFilesCommitter

Posted by "cgpoh (via GitHub)" <gi...@apache.org>.
cgpoh commented on issue #6630:
URL: https://github.com/apache/iceberg/issues/6630#issuecomment-1621528483

   Thanks @dou-dou for your reply! My replacePartitions flag is set to false and hence throwing exceptions with empty commit.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] dou-dou commented on issue #6630: Purpose of MAX_CONTINUOUS_EMPTY_COMMITS in IcebergFilesCommitter

Posted by "dou-dou (via GitHub)" <gi...@apache.org>.
dou-dou commented on issue #6630:
URL: https://github.com/apache/iceberg/issues/6630#issuecomment-1621060004

   @pvary I have found that the exception is caused by my unreasonable configuration.Thank you very much for your help.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] pvary commented on issue #6630: Purpose of MAX_CONTINUOUS_EMPTY_COMMITS in IcebergFilesCommitter

Posted by "pvary (via GitHub)" <gi...@apache.org>.
pvary commented on issue #6630:
URL: https://github.com/apache/iceberg/issues/6630#issuecomment-1612447663

   @dou-dou: Could you please share the Flink+Iceberg version you are using?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] pvary commented on issue #6630: Purpose of MAX_CONTINUOUS_EMPTY_COMMITS in IcebergFilesCommitter

Posted by "pvary (via GitHub)" <gi...@apache.org>.
pvary commented on issue #6630:
URL: https://github.com/apache/iceberg/issues/6630#issuecomment-1617384809

   Seems like a bug to me. When we are writing data to the IcebergSink, and there is no data come in for the specified `maxContinuousEmptyCommits` number of commits, then we try to commit an empty changeset with the following code in `lcebergFilesCommitter`:
   ```
       if (totalFiles != 0 || continuousEmptyCheckpoints % maxContinuousEmptyCommits == 0) {
         if (replacePartitions) {
           replacePartitions(pendingResults, summary, newFlinkJobId, operatorId, checkpointId);
         } else {
           commitDeltaTxn(pendingResults, summary, newFlinkJobId, operatorId, checkpointId);
         }
         continuousEmptyCheckpoints = 0;
       } else {
         LOG.info("Skip commit for checkpoint {} due to no data files or delete files.", checkpointId);
       }
   ```
   
   I think we might want to handle the zero commits with a different code path. Maybe we just switch this case to handled by `commitDeltaTxn` regardless of the `replacePartitions` value.
   
   I do not have enough time nowadays to write and test the patch, but if you have time to write it, I will try to find time to review it.
   
   CC: @stevenzwu, @hililiwei 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] pvary commented on issue #6630: Purpose of MAX_CONTINUOUS_EMPTY_COMMITS in IcebergFilesCommitter

Posted by "pvary (via GitHub)" <gi...@apache.org>.
pvary commented on issue #6630:
URL: https://github.com/apache/iceberg/issues/6630#issuecomment-1621046025

   @dou-dou: What is the Iceberg Sink configuration you are using?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] hililiwei commented on issue #6630: Purpose of MAX_CONTINUOUS_EMPTY_COMMITS in IcebergFilesCommitter

Posted by "hililiwei (via GitHub)" <gi...@apache.org>.
hililiwei commented on issue #6630:
URL: https://github.com/apache/iceberg/issues/6630#issuecomment-1619985313

   @pvary Yes, seems like a bug. But do you overwrite the data of a partition every time @cgpoh ?
   If commit an empty snapshot using `overwrite` mode, it will occur this error. I raised  #7983 , which follows @pvary ’s suggestion, regardless of the replacePartitions value.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] pvary commented on issue #6630: Purpose of MAX_CONTINUOUS_EMPTY_COMMITS in IcebergFilesCommitter

Posted by "pvary (via GitHub)" <gi...@apache.org>.
pvary commented on issue #6630:
URL: https://github.com/apache/iceberg/issues/6630#issuecomment-1621068395

   Still, as @hililiwei mentioned in her other comment, it might be good to prevent these "unreasonable" configurations, if they are indeed unreasonable 😄 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] dou-dou commented on issue #6630: Purpose of MAX_CONTINUOUS_EMPTY_COMMITS in IcebergFilesCommitter

Posted by "dou-dou (via GitHub)" <gi...@apache.org>.
dou-dou commented on issue #6630:
URL: https://github.com/apache/iceberg/issues/6630#issuecomment-1621465999

   @cgpoh as **hililiwei** mentioned in her other comment,I'm probably using override mode in the stream task when I use [chunjun ](https://github.com/DTStack/chunjun)to incrementally sync data from mysql to iceberg.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org