You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "Yun Gao (Jira)" <ji...@apache.org> on 2021/12/14 14:57:00 UTC

[jira] [Created] (FLINK-25305) Always wait for input channel state and result partition state get completed in AsyncRunnable

Yun Gao created FLINK-25305:
-------------------------------

             Summary: Always wait for input channel state and result partition state get completed in AsyncRunnable
                 Key: FLINK-25305
                 URL: https://issues.apache.org/jira/browse/FLINK-25305
             Project: Flink
          Issue Type: Sub-task
          Components: Runtime / Checkpointing
    Affects Versions: 1.15.0
            Reporter: Yun Gao


 
{code:java}
29245 [jobmanager-io-thread-12] INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Decline checkpoint 16 by task 07fea3eb73acb4898317b4aa2c9fea30 of job da6de908107aa847cde5e9e0beb4812b at 064277c9-73dc-4bf2-8729-91ab16bbe8c6 @ localhost (dataPort=-1).org.apache.flink.util.SerializedThrowable: Asynchronous task checkpoint failed.
    at org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.handleExecutionException(AsyncCheckpointRunnable.java:321) ~[classes/:?]
    at org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.run(AsyncCheckpointRunnable.java:158) ~[classes/:?]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_271]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_271]
    at java.lang.Thread.run(Thread.java:748) [?:1.8.0_271]
Caused by: org.apache.flink.util.SerializedThrowable: Could not materialize checkpoint 16 for operator keyed (1/5)#5.
    at org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.handleExecutionException(AsyncCheckpointRunnable.java:299) ~[classes/:?]
    ... 4 more
Caused by: org.apache.flink.util.SerializedThrowable
    at org.apache.flink.util.Preconditions.checkState(Preconditions.java:177) ~[classes/:?]
    at org.apache.flink.util.Preconditions.checkCompletedNormally(Preconditions.java:261) ~[classes/:?]
    at org.apache.flink.util.concurrent.FutureUtils.checkStateAndGet(FutureUtils.java:1193) ~[classes/:?]
    at org.apache.flink.runtime.checkpoint.CheckpointMetricsBuilder.build(CheckpointMetricsBuilder.java:133) ~[classes/:?]
    at org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.reportCompletedSnapshotStates(AsyncCheckpointRunnable.java:248) ~[classes/:?]
    at org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.run(AsyncCheckpointRunnable.java:139) ~[classes/:?]
    ... 3 more
 {code}
 


When both unaligned checkpoint and final checkpoint is enabled, some checkpoints would fail due to the above exception at the async phase, indicating that the checkpoint metric futures are not fully fulfilled. 

The exception should be caused by when the operator get closed at first run, or a task is restored with previously fully finished, when taking checkpoint, we would skip snapshotting the state of the operators. Specially, we would also not includes the InputChannelStates and the ResultPartitionState attached to the operator. Then with unaligned checkpoint, there would be the following bad case:

1. The task received the first barrier.
2. With the process of unaligned checkpoint, the task would snapshot the state of the operators.
3. The checkpoint would start the asynchronous part.
4. Normally in the asynchronous part, it would wait till all the state futures get done, including the channel states and result partition states. With this method, it ensures the asynchronous part would wait till the last barrier arrived. But when we have closed operators and fully finished tasks, the situation is broken.
5. Then the asynchronous part would fail since when it try to build the CheckpointMetrics, the alignment for this checkpoint is in fact not done yet.

 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)