You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Piotr Nowojski (Jira)" <ji...@apache.org> on 2022/05/11 12:54:00 UTC

[jira] [Comment Edited] (FLINK-27251) Timeout aligned to unaligned checkpoint barrier in the output buffers of an upstream subtask

    [ https://issues.apache.org/jira/browse/FLINK-27251?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17534876#comment-17534876 ] 

Piotr Nowojski edited comment on FLINK-27251 at 5/11/22 12:53 PM:
------------------------------------------------------------------

I've looked at the code and I have a couple of high level comments/questions:

# Instead of overriding {{PipelinedResultPartition#broadcastEvent}} having a quite ugly {{instanceof}} check, I would try to move this code to, {{SubtaskCheckpointCoordinatorImpl#checkpointState}} shortly after broadcasting the {{CheckpointBarrier}}.
# {{AlignedCheckpointTimeoutHandle}} should not be executed in non task thread. {{InputGates}}, {{InputChannels}}, {{ResultPartitions}} and {{ResultSubpartition}} are mostly non-thread safe classes, and all interactions with them should happen through the task thread, with an exception of the netty threads, pulling or putting data from subpartitions/into channels. I would change this and make {{AlignedCheckpointTimeoutHandle}} a {{StreamTask#mainMailboxExecutor}} action/mail. [*] 
# It looks like the completion of an aligned checkpoint will be always blocked on the timeout in your PoC, regardless if the checkpoint barrier is sent downstream or not before the timeout? So without any backpressure, enabling the timeout will extend the time of the aligned checkpoint?
# {{ChannelStateWriter}} might be blocked waiting for the output data future to complete, wasting resources/cycles?

[*] If you haven't seen this, main task thread runs a loop that prioritises processing new mails/actions before processing input records. The task thread spends all of its time in {{StreamTask#invoke}} and in {{StreamTask#runMailboxLoop}} in particular.


was (Author: pnowojski):
I've looked at the code and I have a couple of high level comments/questions:

# Instead of overriding {{PipelinedResultPartition#broadcastEvent}} having a quite ugly {{instanceof}} check, I would try to move this code to, {{SubtaskCheckpointCoordinatorImpl#checkpointState}} shortly after broadcasting the {{CheckpointBarrier}}.
# {{AlignedCheckpointTimeoutHandle}} should not be executed in non task thread. {{InputGates}}, {{InputChannels}}, {{ResultPartitions}} and {{ResultSubpartition}} are mostly non-thread safe classes, and all interactions with them should happen through the task thread, with an exception of the netty threads, pulling or putting data from subpartitions/into channels. I would change this and make {{AlignedCheckpointTimeoutHandle}} a {{StreamTask#mainMailboxExecutor}} action/mail. [1] 
# It looks like the completion of an aligned checkpoint will be always blocked on the timeout in your PoC, regardless if the checkpoint barrier is sent downstream or not before the timeout? So without any backpressure, enabling the timeout will extend the time of the aligned checkpoint?
# {{ChannelStateWriter}} might be blocked waiting for the output data future to complete, wasting resources/cycles?

[1] If you haven't seen this, main task thread runs a loop that prioritises processing new mails/actions before processing input records. The task thread spends all of its time in {{StreamTask#invoke}} and in {{StreamTask#runMailboxLoop}} in particular.

> Timeout aligned to unaligned checkpoint barrier in the output buffers of an upstream subtask
> --------------------------------------------------------------------------------------------
>
>                 Key: FLINK-27251
>                 URL: https://issues.apache.org/jira/browse/FLINK-27251
>             Project: Flink
>          Issue Type: Improvement
>          Components: Runtime / Checkpointing
>    Affects Versions: 1.14.0, 1.15.0
>            Reporter: fanrui
>            Priority: Major
>             Fix For: 1.16.0
>
>
> After FLINK-23041, the downstream task can be switched UC when {_}currentTime - triggerTime > timeout{_}. But the downstream task still needs wait for all barriers of upstream. 
> If the back pressure is serve, the downstream task cannot receive all barrier within CP timeout, causes CP to fail.
>  
> Can we support upstream Task switching from Aligned to UC? It means that when the barrier cannot be sent from the output buffer to the downstream task within the [execution.checkpointing.aligned-checkpoint-timeout|https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/config/#execution-checkpointing-aligned-checkpoint-timeout], the upstream task switches to UC and takes a snapshot of the data before the barrier in the output buffer.
>  
> Hi [~akalashnikov] , please help take a look in your free time, thanks a lot.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)