You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Yunfeng Zhou (Jira)" <ji...@apache.org> on 2022/07/19 07:22:00 UTC

[jira] [Commented] (FLINK-26029) Generalize the checkpoint protocol of OperatorCoordinator.

    [ https://issues.apache.org/jira/browse/FLINK-26029?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17568386#comment-17568386 ] 

Yunfeng Zhou commented on FLINK-26029:
--------------------------------------

Our basic idea to resolve this problem is to extend the blocking period. The communication should be blocked before the OC starts the checkpoint and reopened after the subtasks finish the checkpoint so that both OCs and subtasks would see the same version of the message when each of them does the checkpoint. Besides, communications in both directions should be blocked, instead of just blocking events from OC to the subtasks, and the blocked messages should be properly stored in the checkpoint snapshot for recovery.

> Generalize the checkpoint protocol of OperatorCoordinator.
> ----------------------------------------------------------
>
>                 Key: FLINK-26029
>                 URL: https://issues.apache.org/jira/browse/FLINK-26029
>             Project: Flink
>          Issue Type: Improvement
>          Components: Runtime / Checkpointing
>    Affects Versions: 1.14.3
>            Reporter: Jiangjie Qin
>            Assignee: Yunfeng Zhou
>            Priority: Major
>              Labels: extensibility, pull-request-available, stale-assigned
>             Fix For: 1.16.0
>
>
> Currently, the JM uses OperatorEventValve to control the communication from OperatorCoordinator (OC) to the subtasks in order to achieve state consistency across OC and subtasks in case of job failures. The valve is closed when a checkpoint starts and reopened after the checkpoint barriers are sent to the source subtasks.
> While this mechanism works for the source operators, it unnecessarily limits the general usage of OC due to the following limitations:
> - It does not handle (e.g. blocks) the control flow messages from subtasks to OC.
> - It does not handle the control flow messages from OC to subtasks which are sent after checkpoint barriers have been sent to sources but before subtasks have received those barriers.
> If the limitations mentioned above are not satisfied, consistency issues might occur. For example, if a subtask sends messages to its coordinator after the checkpoint barriers are sent to the sources and before the subtasks receive the barriers, these messages would be recorded in the subtask's snapshot but not the coordinator's. When the Flink job recovers from this snapshot, the subtask would have a record of sending out the message while the coordinator has no record of receiving it.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)