You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Alex (JIRA)" <ji...@apache.org> on 2019/05/29 09:43:00 UTC

[jira] [Comment Edited] (FLINK-12482) Make checkpoint trigger/notifyComplete run via the mailbox queue

    [ https://issues.apache.org/jira/browse/FLINK-12482?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16850703#comment-16850703 ] 

Alex edited comment on FLINK-12482 at 5/29/19 9:42 AM:
-------------------------------------------------------

Some technical notes/details of the possible approach (discussed offline with [~srichter]):

Current state of affairs:

The checkpoint trigger handling is done in {{StreamTask.performCheckpoint}}. The method is executed in the main task thread (for non source tasks, on {{triggerCheckpointOnBarrier}}) and in an auxiliary thread (for source tasks, on {{triggerCheckpoint}} invoked via RPC).

For synchronous savepoints (FLINK-11458) the code in {{StreamTask.performCheckpoint}} would block (without explicit acquisition of the checkpoint lock).
The blocking happens until the corresponding checkpoint has been notified to be complete (or if the task has been cancelled) (notification happens via {{notifyCheckpointComplete}} by an auxiliary thread invoked via RPC).
For non source tasks, it means that only after checkpoint notification, the main task thread would resume it's work.
For source tasks, such blocking hasn't much impact on the main task thread.

Proposed change:
1. For the source tasks, the {{triggerCheckpoint}} would be handled by the main task thread. In case of blocking (synchronous savepoints), it means that source task would also stop processing other mailbox activities. We suppose that such change is aligned with FLINK-11458 (even desired, but was difficult to implement before);
2. {{notifyCheckpointComplete}} also can be delegated to the task's main thread. With an additional change (for synchronous savepoints), that the main thread would be "blocked" on the mailbox (instead of the synchronous checkpoint latch) by "waiting" for the special checkpoint letter in the mailbox queue. All other messages would be discarded (except exit message) - we suppose that this is also aligned with FLINK-11458, because the synchronous checkpoint implies that the task would be finished/stopped after the checkpoint is complete.




was (Author: 1u0):
Some technical notes/details of the possible approach (discussed offline with [~srichter]):

Current state of affairs:

The checkpoint trigger handling is done in {{StreamTask.performCheckpoint}}. The method is executed in the main task thread (for non source tasks, on {{triggerCheckpointOnBarrier}}) and in an auxiliary thread (for source tasks, on {{triggerCheckpoint}} invoked via RPC).

For synchronous savepoints (FLINK-11458) the code in {{StreamTask.performCheckpoint}} would block (without explicit acquisition of the checkpoint lock).
The blocking happens until the corresponding checkpoint is not notified to be complete (or if the task has been cancelled) (notification happens via {{notifyCheckpointComplete}} by an auxiliary thread invoked via RPC).
For non source tasks, it means that only after checkpoint notification, the main task thread would resume it's work.
For source tasks, such blocking hasn't much impact on the main task thread.

Proposed change:
1. For the source tasks, the {{triggerCheckpoint}} would be handled by the main task thread. In case of blocking (synchronous savepoints), it means that source task would also stop processing other mailbox activities. We suppose that such change is aligned with FLINK-11458 (even desired, but was difficult to implement before);
2. {{notifyCheckpointComplete}} also can be delegated to the task's main thread. With an additional change (for synchronous savepoints), that the main thread would be "blocked" on the mailbox (instead of the synchronous checkpoint latch) by "waiting" for the special checkpoint letter in the mailbox queue. All other messages would be discarded (except exit message) - we suppose that this is also aligned with FLINK-11458, because the synchronous checkpoint implies that the task would be finished/stopped after the checkpoint is complete.



> Make checkpoint trigger/notifyComplete run via the mailbox queue
> ----------------------------------------------------------------
>
>                 Key: FLINK-12482
>                 URL: https://issues.apache.org/jira/browse/FLINK-12482
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Runtime / Operators
>            Reporter: Stefan Richter
>            Assignee: Alex
>            Priority: Major
>
> For the stream source, we also need to enqueue checkpoint related signals (trigger, notifyComplete) to the mailbox now so that they run in the stream task's main-thread.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)