You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Sung Gon Yi <sk...@mac.com> on 2021/04/15 01:04:09 UTC

WindowFunction is stuck until next message is processed although Watermark with idle timeout is applied.

Hello,

I have a question about watermark with idle timeout.

I made an example about it, https://github.com/skonmeme/rare_stream/blob/main/src/main/scala/com/skonuniverse/flink/RareStreamWithIdealTimeout.scala <https://github.com/skonmeme/rare_stream/blob/main/src/main/scala/com/skonuniverse/flink/RareStreamWithIdealTimeout.scala>

There is WindowFunction with 5 sec tumbling window and messages are imported per 120 sec.
And idle timeout is set by 30 sec.

However, when running, first message had been processed after 120 sec, which means next message has been imported.

Please, tell me what I misunderstand about idle timeout and how to solve this problem.

Thanks,
Sung Gon

Re: WindowFunction is stuck until next message is processed although Watermark with idle timeout is applied.

Posted by David Anderson <da...@apache.org>.
The withIdleness option does not attempt to handle situations where all of
the sources are idle.

Flink operators with multiple input channels keep track of the current
watermark from each channel, and use the minimum of these watermarks as
their own watermark. withIdleness marks idle channels as inactive, which
then indicates to the runtime that those channels should not be taken into
consideration, so that their stalled watermark won't hold back the overall
watermark. This isn't enough to keep the watermarks advancing when
everything is idle.

Advancing the watermark when no events are flowing at all is something you
can do in your application, by implementing your own idleness detection and
advancing the watermark artificially based on the progress of system time.

Regards,
David

On Thu, Apr 15, 2021 at 3:04 AM Sung Gon Yi <sk...@mac.com> wrote:

> Hello,
>
> I have a question about watermark with idle timeout.
>
> I made an example about it,
> https://github.com/skonmeme/rare_stream/blob/main/src/main/scala/com/skonuniverse/flink/RareStreamWithIdealTimeout.scala
>
> There is WindowFunction with 5 sec tumbling window and messages are
> imported per 120 sec.
> And idle timeout is set by 30 sec.
>
> However, when running, first message had been processed after 120 sec,
> which means next message has been imported.
>
> Please, tell me what I misunderstand about idle timeout and how to solve
> this problem.
>
> Thanks,
> Sung Gon
>