You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Dongwon Kim <ea...@gmail.com> on 2020/12/09 16:21:15 UTC

What happens when all input partitions become idle

Hi,

Let's consider two operators: A (parallelism=2) and B (parallelism=1).
B has two input partitions, B_A1 and B_A2, which are connected to A1 and A2
respectively.

At some point,
- B_A1's watermark : 12
- B_A2's watermark : 10
- B's event-time clock : 10 = min(12, 10)
- B has registered a timer at 12
- No data will be fed into the pipeline for the next few hours, but I want
the timer to be fired after a few seconds if no data is coming.

After adopting a watermark strategy explained in [1], I found that the
timer is fired as wished! That's awesome!

But I want to know what happens inside in detail.
Based on my current understanding of how watermark is calculated [2], I
cannot come up with what happens inside when idleness is considered.
If B_A2 is marked idle earlier than B_A1, is B's event-time clock
calculated as min(12, MAX_WATERMARK)?

Thanks,

Dongwon

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/event_timestamps_watermarks.html#dealing-with-idle-sources
[2]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/event_timestamps_watermarks.html#how-operators-process-watermarks
[3]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-126%3A+Unify+%28and+separate%29+Watermark+Assigners

Re: What happens when all input partitions become idle

Posted by Dongwon Kim <ea...@gmail.com>.
Hi Benchao,

Thanks for the input.

The code is self-explanatory.

Best,

Dongwon


On Thu, Dec 10, 2020 at 12:20 PM Benchao Li <li...@apache.org> wrote:

> Hi Dongwon,
>
> I think you understand it correctly.
> You can find this logic here[1]
>
> [1]
> https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamstatus/StatusWatermarkValve.java#L108
>
> Dongwon Kim <ea...@gmail.com> 于2020年12月10日周四 上午12:21写道:
>
>> Hi,
>>
>> Let's consider two operators: A (parallelism=2) and B (parallelism=1).
>> B has two input partitions, B_A1 and B_A2, which are connected to A1 and
>> A2 respectively.
>>
>> At some point,
>> - B_A1's watermark : 12
>> - B_A2's watermark : 10
>> - B's event-time clock : 10 = min(12, 10)
>> - B has registered a timer at 12
>> - No data will be fed into the pipeline for the next few hours, but I
>> want the timer to be fired after a few seconds if no data is coming.
>>
>> After adopting a watermark strategy explained in [1], I found that the
>> timer is fired as wished! That's awesome!
>>
>> But I want to know what happens inside in detail.
>> Based on my current understanding of how watermark is calculated [2], I
>> cannot come up with what happens inside when idleness is considered.
>> If B_A2 is marked idle earlier than B_A1, is B's event-time clock
>> calculated as min(12, MAX_WATERMARK)?
>>
>> Thanks,
>>
>> Dongwon
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/event_timestamps_watermarks.html#dealing-with-idle-sources
>> [2]
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/event_timestamps_watermarks.html#how-operators-process-watermarks
>> [3]
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-126%3A+Unify+%28and+separate%29+Watermark+Assigners
>>
>
>
> --
>
> Best,
> Benchao Li
>

Re: What happens when all input partitions become idle

Posted by Benchao Li <li...@apache.org>.
Hi Dongwon,

I think you understand it correctly.
You can find this logic here[1]

[1]
https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamstatus/StatusWatermarkValve.java#L108

Dongwon Kim <ea...@gmail.com> 于2020年12月10日周四 上午12:21写道:

> Hi,
>
> Let's consider two operators: A (parallelism=2) and B (parallelism=1).
> B has two input partitions, B_A1 and B_A2, which are connected to A1 and
> A2 respectively.
>
> At some point,
> - B_A1's watermark : 12
> - B_A2's watermark : 10
> - B's event-time clock : 10 = min(12, 10)
> - B has registered a timer at 12
> - No data will be fed into the pipeline for the next few hours, but I want
> the timer to be fired after a few seconds if no data is coming.
>
> After adopting a watermark strategy explained in [1], I found that the
> timer is fired as wished! That's awesome!
>
> But I want to know what happens inside in detail.
> Based on my current understanding of how watermark is calculated [2], I
> cannot come up with what happens inside when idleness is considered.
> If B_A2 is marked idle earlier than B_A1, is B's event-time clock
> calculated as min(12, MAX_WATERMARK)?
>
> Thanks,
>
> Dongwon
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/event_timestamps_watermarks.html#dealing-with-idle-sources
> [2]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/event_timestamps_watermarks.html#how-operators-process-watermarks
> [3]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-126%3A+Unify+%28and+separate%29+Watermark+Assigners
>


-- 

Best,
Benchao Li