You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "ming li (Jira)" <ji...@apache.org> on 2021/03/26 09:27:00 UTC

[jira] [Created] (FLINK-21990) Double check Task status when perform checkpoint.

ming li created FLINK-21990:
-------------------------------

             Summary: Double check Task status when perform checkpoint.
                 Key: FLINK-21990
                 URL: https://issues.apache.org/jira/browse/FLINK-21990
             Project: Flink
          Issue Type: Bug
    Affects Versions: 1.11.0
            Reporter: ming li


We need to double check Task status when making Checkpoint. Otherwise, after a Task failed, the checkpoint may still be made successfully.



For example, I try to throw an exception at 17:10:24.069, get the lock at 17:10:24.070 and start making Checkpoint, and finish making Checkpoint at 17:10:24.373.
{code:java}
17:10:24.069 [Legacy Source Thread - Source_Custom_Source -> Sink_Unnamed (2/4)- execution # 0] INFO  org.apache.flink.test.checkpointing.RegionCheckpointITCase  - throw expected exception
17:10:24.070 [Source_Custom_Source -> Sink_Unnamed (2/4)- execution # 0] INFO  org.apache.flink.runtime.state.AbstractSnapshotStrategy  - DefaultOperatorStateBackend snapshot (In-Memory Stream Factory, synchronous part) in thread Thread[Source_Custom_Source -> Sink_Unnamed (2/4)- execution # 0,5,Flink Task Threads] took 0 ms.
17:10:24.070 [Source_Custom_Source -> Sink_Unnamed (2/4)- execution # 0] INFO  org.apache.flink.test.checkpointing.RegionCheckpointITCase  - sleep 300 ms
17:10:24.372 [Source_Custom_Source -> Sink_Unnamed (2/4)- execution # 0] INFO  org.apache.flink.runtime.state.AbstractSnapshotStrategy  - DefaultOperatorStateBackend snapshot (In-Memory Stream Factory, synchronous part) in thread Thread[Source_Custom_Source -> Sink_Unnamed (2/4)- execution # 0,5,Flink Task Threads] took 0 ms.
17:10:24.373 [jobmanager-future-thread-1] INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator  - Completed checkpoint 2 for job 4a08c4a50d00dfd56f86eb6ccb83b89c (0 bytes in 1137 ms).

{code}
 

 

From the code point of view, we only judged the state of the task at the beginning, and when the lock was obtained, we directly started to make the Checkpoint.
{code:java}
private boolean performCheckpoint(
        CheckpointMetaData checkpointMetaData,
        CheckpointOptions checkpointOptions,
        CheckpointMetricsBuilder checkpointMetrics)
        throws Exception {

    if (isRunning) {
        actionExecutor.runThrowing(
                () -> {//do checkpoint});
        return true;
    } else {
        ...
    }
}{code}
However, during the period of acquiring the lock, the task state is likely to change. Compared with the Flink 1.9 version code, the 1.9 version judges the task status after acquiring the lock.

 
{code:java}
private boolean performCheckpoint(
      CheckpointMetaData checkpointMetaData,
      CheckpointOptions checkpointOptions,
      CheckpointMetrics checkpointMetrics,
      boolean advanceToEndOfTime) throws Exception {

   LOG.debug("Starting checkpoint ({}) {} on task {}",
      checkpointMetaData.getCheckpointId(), checkpointOptions.getCheckpointType(), getName());

   final long checkpointId = checkpointMetaData.getCheckpointId();

   synchronized (lock) {
      if (isRunning) {
         //do checkpoint
      } else {
        ...
      }
 }{code}
 

Therefore, I think we need to double check the task status to avoid the situation where the task fails but the Checkpoint can still succeed in the process of acquiring the lock.

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)