You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Jiayi Liao (JIRA)" <ji...@apache.org> on 2018/10/27 07:47:00 UTC

[jira] [Comment Edited] (FLINK-5601) Window operator does not checkpoint watermarks

    [ https://issues.apache.org/jira/browse/FLINK-5601?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16665975#comment-16665975 ] 

Jiayi Liao edited comment on FLINK-5601 at 10/27/18 7:46 AM:
-------------------------------------------------------------

[~aljoscha][~kkl0u]

Here is my idea:

In TimestampsAndPeriodicWatermarksOperator and TimestampsAndPunctuatedWatermarksOperator:
* Add a new variable ListState<Long> restoreWatermarks, for restore watermarks.
* override snapshotState function, store current watermark into the list state.
* override initializeState function, use getUnionListState to retreive watermarks and make the lowest as the current watermark, then emit the watermark immediately.

Test:
* Test the checkpointing of the state of watermark.
* Test the lowest logic of initializeState function.

The watermark should be emitted before other elements from the source because it's emitted in initializeState(), but I'm not very sure about this. Does anyone know if there is a risk?


What do you think?


was (Author: wind_ljy):
[~aljoscha][~kkl0u]

Here is my idea:

In TimestampsAndPeriodicWatermarksOperator and TimestampsAndPunctuatedWatermarksOperator:
* Add a new variable ListState<Long> restoreWatermarks, for restore watermarks.
* override snapshotState function, store current watermark into the list state.
* override initializeState function, use getUnionListState to retreive watermarks and make the lowest as the current watermark, then emit the watermark immediately.

The watermark should be emitted before other elements from the source because it's emitted in initializeState(), but I'm not very sure about this. Does anyone know if there is a risk?

Test:
* Test the checkpointing of the state of watermark.
* Test the lowest logic of initializeState function.

What do you think?

> Window operator does not checkpoint watermarks
> ----------------------------------------------
>
>                 Key: FLINK-5601
>                 URL: https://issues.apache.org/jira/browse/FLINK-5601
>             Project: Flink
>          Issue Type: Improvement
>          Components: State Backends, Checkpointing
>    Affects Versions: 1.5.0, 1.6.0, 1.7.0
>            Reporter: Ufuk Celebi
>            Assignee: Jiayi Liao
>            Priority: Critical
>             Fix For: 1.8.0
>
>
> During release testing [~stefanrichter83@gmail.com] and I noticed that watermarks are not checkpointed in the window operator.
> This can lead to non determinism when restoring checkpoints. I was running an adjusted {{SessionWindowITCase}} via Kafka for testing migration and rescaling and ran into failures, because the data generator required determinisitic behaviour.
> What happened was that on restore it could happen that late elements were not dropped, because the watermarks needed to be re-established after restore first.
> [~aljoscha] Do you know whether there is a special reason for explicitly not checkpointing watermarks?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)