You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "Stephan Ewen (Jira)" <ji...@apache.org> on 2020/11/27 14:15:00 UTC

[jira] [Created] (FLINK-20396) Replace "OperatorCoordinator.subtaskFailed()" with "subtaskRestored()"

Stephan Ewen created FLINK-20396:
------------------------------------

             Summary: Replace "OperatorCoordinator.subtaskFailed()" with "subtaskRestored()"
                 Key: FLINK-20396
                 URL: https://issues.apache.org/jira/browse/FLINK-20396
             Project: Flink
          Issue Type: Bug
          Components: Runtime / Coordination
    Affects Versions: 1.11.2
            Reporter: Stephan Ewen
            Assignee: Stephan Ewen
             Fix For: 1.12.0, 1.11.3


There are no strong order guarantees between {{OperatorCoordinator.subtaskFailed()}} and {{OperatorCoordinator.notifyCheckpointComplete()}}.

It can happen that a checkpoint completes after the notification for task failure is sent:
  - {{OperatorCoordinator.checkpoint()}}
  - {{OperatorCoordinator.subtaskFailed()}}
  - {{OperatorCoordinator.checkpointComplete()}}

The subtask failure here does not know whether the previous checkpoint completed or not. It cannot decide what state the subtask will be in after recovery.
There is no easy fix right now to strictly guarantee the order of the method calls, so alternatively we need to provide the necessary information to reason about the status of tasks.

We should replace {{OperatorCoordinator.subtaskFailed(int subtask)}} with {{OperatorCoordinator.subtaskRestored(int subtask, long checkpoint)}}. That implementations get the explicit checkpoint ID for the subtask recovery, and can align that with the IDs of checkpoints that were taken.

It is still (in rare cases) possible that for a specific checkpoint C, {{OperatorCoordinator.subtaskRestored(subtaskIndex, C)) comes before {{OperatorCoordinator.checkpointComplete(C)}}.


h3. Background

The Checkpointing Procedure is partially asynchronous on the {{JobManager}} / {{CheckpointCoordinator}}: After all subtasks acknowledged the checkpoint, the finalization (writing out metadata and registering the checkpoint in ZooKeeper) happens in an I/O thread, and the checkpoint completes after that.

This sequence of events can happen:
  - tasks acks checkpoint
  - checkpoint fully acknowledged, finalization starts
  - task fails
  - task failure notification is dispatched
  - checkpoint completes.

For task failures and checkpoint completion, no order is defined.

However, for task restore and checkpoint completion, the order is well defined: When a task is restored, pending checkpoints are either canceled or complete. None can be within finalization. That is currently guaranteed with a lock in the {{CheckpointCoordinator}}.
(An implication of that being that restores can be blocking operations in the scheduler, which is not ideal from the perspective of making the scheduler async/non-blocking, but it is currently essential for correctness).




--
This message was sent by Atlassian Jira
(v8.3.4#803005)