You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Eron Wright (JIRA)" <ji...@apache.org> on 2018/01/11 00:41:00 UTC

[jira] [Commented] (FLINK-7883) Stop fetching source before a cancel with savepoint

    [ https://issues.apache.org/jira/browse/FLINK-7883?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16321463#comment-16321463 ] 

Eron Wright  commented on FLINK-7883:
-------------------------------------

Can we restate the problem that we're trying to solve?   To my understanding, problem one is that cancel-with-savepoint is not atomic; cancellation happens some time after the checkpoint state is collected, causing undesirable at-least-once behavior or rollback behavior.  Maybe this could be solved by enhancing the checkpoint barrier with a termination flag, which would cause the cancel function to be invoked while the checkpoint synchronization lock is still held.  I don't know whether this would play nice with CheckpointListener.

Problem two is the existence of two similar operations, cancel and stop.  Stop seems to be aimed at turning an unbounded source into a bounded source.  I think it would be awesome to be able to parameterize the stop call w/ connector specifics, e.g. "stop at such-and-such offset".  I would hesitate to co-opt the 'stop' functionality / StoppableFunction to solve problem one.

> Stop fetching source before a cancel with savepoint
> ---------------------------------------------------
>
>                 Key: FLINK-7883
>                 URL: https://issues.apache.org/jira/browse/FLINK-7883
>             Project: Flink
>          Issue Type: Improvement
>          Components: DataStream API, Kafka Connector, State Backends, Checkpointing
>    Affects Versions: 1.4.0, 1.3.2
>            Reporter: Antoine Philippot
>
> For a cancel with savepoint command, the JobManager trigger the cancel call once the savepoint is finished, but during the savepoint execution, kafka source continue to poll new messages which will not be part of the savepoint and will be replayed on the next application start.
> A solution could be to stop fetching the source stream task before triggering the savepoint.
> I suggest to add an interface {{StoppableFetchingSourceFunction}} with a method {{stopFetching}} that existant SourceFunction implementations could implement.
> We can add a {{stopFetchingSource}} property in 
>  {{CheckpointOptions}} class to pass the desired behaviour from {{JobManager.handleMessage(CancelJobWithSavepoint)}} to {{SourceStreamTask.triggerCheckpoint}}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)