You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Piotr Nowojski (Jira)" <ji...@apache.org> on 2021/01/25 18:20:00 UTC

[jira] [Commented] (FLINK-21132) BoundedOneInput.endInput is called when taking synchronous savepoint

    [ https://issues.apache.org/jira/browse/FLINK-21132?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17271590#comment-17271590 ] 

Piotr Nowojski commented on FLINK-21132:
----------------------------------------

Thanks for reporting and analysing the issue. Yes, I think you are right about describing the behaviour. I took a quick look at the code, and the problem seems to be in the {{StreamOperatorWrapper#close}} method. It is called for every clean shutdown of the task, which AFAIK can be either:
* true end of input
* stop with savepoint

And regardless of which of those two triggered the clean shutdown, end of input will be issued to every operator which is not head operator of the chain.

Now depending how we should look at it, there is one of two bugs.

# If we assume stop with savepoint shouldn't trigger end of input, in that case non head operators shouldn't receive end of input.
# If we assume stop with savepoint should trigger end of input, in that case head operator should receive end of input.

[~aljoscha] [~AHeise], what do you think?

> BoundedOneInput.endInput is called when taking synchronous savepoint
> --------------------------------------------------------------------
>
>                 Key: FLINK-21132
>                 URL: https://issues.apache.org/jira/browse/FLINK-21132
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Task
>    Affects Versions: 1.10.2, 1.10.3, 1.11.3, 1.12.1
>            Reporter: Kezhu Wang
>            Priority: Major
>
> [~elkhand](?) reported on project iceberg that {{BoundedOneInput.endInput}} was [called|https://github.com/apache/iceberg/issues/2033#issuecomment-765864038] when [stopping job with savepoint|https://github.com/apache/iceberg/issues/2033#issuecomment-765557995].
> I think it is a bug of Flink and was introduced in FLINK-14230. The [changes|https://github.com/apache/flink/pull/9854/files#diff-0c5fe245445b932fa83fdaf5c4802dbe671b73e8d76f39058c6eaaaffd9639faL577] rely on {{StreamTask.afterInvoke}} and {{OperatorChain.closeOperators}} will only be invoked after *end of input*. But that is not true long before after [FLIP-34: Terminate/Suspend Job with Savepoint|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=103090212]. Task could enter state called [*finished*|https://github.com/apache/flink/blob/3a8e06cd16480eacbbf0c10f36b8c79a6f741814/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java#L467] after synchronous savepoint, that is an expected job suspension and stopping.
> [~sunhaibotb] [~pnowojski] [~roman_khachatryan] Could you help confirm this ?
> For full context, see [apache/iceberg#2033|https://github.com/apache/iceberg/issues/2033]. I have pushed branch [synchronous-savepoint-conflict-with-bounded-end-input-case|https://github.com/kezhuw/flink/commits/synchronous-savepoint-conflict-with-bounded-end-input-case] in my repository. Test case {{SavepointITCase.testStopSavepointWithBoundedInput}} failed due to {{BoundedOneInput.endInput}} called.
> I am also aware of [FLIP-147: Support Checkpoints After Tasks Finished|https://cwiki.apache.org/confluence/x/mw-ZCQ], maybe the three should align on what *finished* means exactly. [~kkl0u] [~chesnay] [~gaoyunhaii]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)