You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Talat Uyarer via user <us...@beam.apache.org> on 2023/05/19 09:42:59 UTC

Watermark Alignment on Flink Runner's UnboundedSourceWrapper

Hi All,

I have a stream aggregation job which reads from Kafka and writes some
Sinks.

When I submit my job Flink checkpoint size keeps increasing if I use
unaligned checkpoint settings and it does not emit any window results.
If I use an aligned checkpoint, size is somewhat under control(still big)
but Checkpoint alignment takes a long time.

I would like to implement something similar [1]. I believe
if UnboundedSourceWrapper pause reading future watermark partitions it will
reduce the size of the checkpoint and I can use unaligned checkpointing.
What do you think about this approach ? Do you have another solution ?

One more question: I was reading code to implement the above idea. I saw
this code [2] Does Flink Runner have a similar implementation?

Thanks

[1] https://github.com/apache/flink/pull/11968
[2]
https://github.com/apache/beam/blob/master/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java#L207

Re: Watermark Alignment on Flink Runner's UnboundedSourceWrapper

Posted by Jan Lukavský <je...@seznam.cz>.
Yes, FlinkRunner supports Beam's event-time semantics without any 
additional configuration options.

  Jan

On 5/23/23 09:52, Talat Uyarer via dev wrote:
> Hi Jan,
>
> Yes My plan is implementing this feature on FlinkRunner. I have one 
> more question. Does Flink Runner support EventTime or 
> Beam  Custom Watermark ? Do I need to set AutoWatermarkInterval for 
> stateful Beam Flink Jobs. Or Beam timers can handle it without setting 
> that param ?
>
> Thanks
>
> On Tue, May 23, 2023 at 12:03 AM Jan Lukavský <je...@seznam.cz> wrote:
>
>     Hi Talat,
>
>     your analysis is correct, aligning watermarks for jobs with high
>     watermark skew in input partitions really results in faster
>     checkpoints and reduces the size of state. There are generally two
>     places you can implement this - in user code (the source) or
>     inside runner. The user code can use some external synchronization
>     (e.g. ZooKeeper) to keep track of progress of all individual
>     sources. Another option is to read the watermark from Flink's Rest
>     API (some inspiration here [1]).
>
>     Another option would be to make use of [2] and implement this
>     directly in FlinkRunner. I'm not familiar with any possible
>     limitations of this, this was added to Flink quite recently (we
>     would have to support this only when running on Flink 1.15+).
>
>     If you would like to go for the second approach, I'd be happy to
>     help with some guidance.
>
>     Best,
>
>      Jan
>
>     [1]
>     https://github.com/O2-Czech-Republic/proxima-platform/blob/master/flink/utils/src/main/java/cz/o2/proxima/flink/utils/FlinkGlobalWatermarkTracker.java
>     <https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_O2-2DCzech-2DRepublic_proxima-2Dplatform_blob_master_flink_utils_src_main_java_cz_o2_proxima_flink_utils_FlinkGlobalWatermarkTracker.java&d=DwMDaQ&c=V9IgWpI5PvzTw83UyHGVSoW3Uc1MFWe5J8PTfkrzVSo&r=BkW1L6EF7ergAVYDXCo-3Vwkpy6qjsWAz7_GD7pAR8g&m=aecDGh6mOmdF13fznxqZ2eCSP--lTT02C2dNHJ4zckuqPG2gcb8ZMfwPqL8hjYrT&s=9YSIXGwhsRQ08Q4jSLt6pJtZ17cvw5mL-MEt-oCZcP8&e=>
>     [2]
>     https://cwiki.apache.org/confluence/display/FLINK/FLIP-182%3A+Support+watermark+alignment+of+FLIP-27+Sources
>     <https://urldefense.proofpoint.com/v2/url?u=https-3A__cwiki.apache.org_confluence_display_FLINK_FLIP-2D182-253A-2BSupport-2Bwatermark-2Balignment-2Bof-2BFLIP-2D27-2BSources&d=DwMDaQ&c=V9IgWpI5PvzTw83UyHGVSoW3Uc1MFWe5J8PTfkrzVSo&r=BkW1L6EF7ergAVYDXCo-3Vwkpy6qjsWAz7_GD7pAR8g&m=aecDGh6mOmdF13fznxqZ2eCSP--lTT02C2dNHJ4zckuqPG2gcb8ZMfwPqL8hjYrT&s=xL-Z7KyqXzMfcalEPIc9nMzaorgJ7s3cHH444pReL1c&e=>
>
>     On 5/23/23 01:05, Talat Uyarer via dev wrote:
>>     Maybe the User list does not have knowledge about this. That's
>>     why I also resend on the Dev list. Sorry for cross posting
>>
>>
>>     Hi All,
>>
>>     I have a stream aggregation job which reads from Kafka and writes
>>     some Sinks.
>>
>>     When I submit my job Flink checkpoint size keeps increasing if I
>>     use unaligned checkpoint settings and it does not emit any window
>>     results.
>>     If I use an aligned checkpoint, size is somewhat under
>>     control(still big) but Checkpoint alignment takes a long time.
>>
>>     I would like to implement something similar [1]. I believe
>>     if UnboundedSourceWrapper pause reading future watermark
>>     partitions it will reduce the size of the checkpoint and I can
>>     use unaligned checkpointing. What do you think about this
>>     approach ? Do you have another solution ?
>>
>>     One more question: I was reading code to implement the above
>>     idea. I saw this code [2] Does Flink Runner have a similar
>>     implementation?
>>
>>     Thanks
>>
>>     [1] https://github.com/apache/flink/pull/11968
>>     <https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_apache_flink_pull_11968&d=DwMDaQ&c=V9IgWpI5PvzTw83UyHGVSoW3Uc1MFWe5J8PTfkrzVSo&r=BkW1L6EF7ergAVYDXCo-3Vwkpy6qjsWAz7_GD7pAR8g&m=aecDGh6mOmdF13fznxqZ2eCSP--lTT02C2dNHJ4zckuqPG2gcb8ZMfwPqL8hjYrT&s=Rb3yOAuXoya8Yo5IMdRYyxBpvWzJ3UmqhPUgc1WJdNs&e=>
>>     [2]
>>     https://github.com/apache/beam/blob/master/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java#L207
>>     <https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_apache_beam_blob_master_runners_flink_src_main_java_org_apache_beam_runners_flink_translation_wrappers_streaming_state_FlinkStateInternals.java-23L207&d=DwMDaQ&c=V9IgWpI5PvzTw83UyHGVSoW3Uc1MFWe5J8PTfkrzVSo&r=BkW1L6EF7ergAVYDXCo-3Vwkpy6qjsWAz7_GD7pAR8g&m=aecDGh6mOmdF13fznxqZ2eCSP--lTT02C2dNHJ4zckuqPG2gcb8ZMfwPqL8hjYrT&s=iyl4EcoO9Vtd-X9IxkJHUtgFaHEmUTyM__0qmkCIeQ4&e=>
>

Re: Watermark Alignment on Flink Runner's UnboundedSourceWrapper

Posted by Talat Uyarer via dev <de...@beam.apache.org>.
Hi Jan,

Yes My plan is implementing this feature on FlinkRunner. I have one more
question. Does Flink Runner support EventTime or Beam  Custom Watermark ?
Do I need to set AutoWatermarkInterval for stateful Beam Flink Jobs. Or
Beam timers can handle it without setting that param ?

Thanks

On Tue, May 23, 2023 at 12:03 AM Jan Lukavský <je...@seznam.cz> wrote:

> Hi Talat,
>
> your analysis is correct, aligning watermarks for jobs with high watermark
> skew in input partitions really results in faster checkpoints and reduces
> the size of state. There are generally two places you can implement this -
> in user code (the source) or inside runner. The user code can use some
> external synchronization (e.g. ZooKeeper) to keep track of progress of all
> individual sources. Another option is to read the watermark from Flink's
> Rest API (some inspiration here [1]).
>
> Another option would be to make use of [2] and implement this directly in
> FlinkRunner. I'm not familiar with any possible limitations of this, this
> was added to Flink quite recently (we would have to support this only when
> running on Flink 1.15+).
>
> If you would like to go for the second approach, I'd be happy to help with
> some guidance.
>
> Best,
>
>  Jan
>
> [1]
> https://github.com/O2-Czech-Republic/proxima-platform/blob/master/flink/utils/src/main/java/cz/o2/proxima/flink/utils/FlinkGlobalWatermarkTracker.java
> <https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_O2-2DCzech-2DRepublic_proxima-2Dplatform_blob_master_flink_utils_src_main_java_cz_o2_proxima_flink_utils_FlinkGlobalWatermarkTracker.java&d=DwMDaQ&c=V9IgWpI5PvzTw83UyHGVSoW3Uc1MFWe5J8PTfkrzVSo&r=BkW1L6EF7ergAVYDXCo-3Vwkpy6qjsWAz7_GD7pAR8g&m=aecDGh6mOmdF13fznxqZ2eCSP--lTT02C2dNHJ4zckuqPG2gcb8ZMfwPqL8hjYrT&s=9YSIXGwhsRQ08Q4jSLt6pJtZ17cvw5mL-MEt-oCZcP8&e=>
> [2]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-182%3A+Support+watermark+alignment+of+FLIP-27+Sources
> <https://urldefense.proofpoint.com/v2/url?u=https-3A__cwiki.apache.org_confluence_display_FLINK_FLIP-2D182-253A-2BSupport-2Bwatermark-2Balignment-2Bof-2BFLIP-2D27-2BSources&d=DwMDaQ&c=V9IgWpI5PvzTw83UyHGVSoW3Uc1MFWe5J8PTfkrzVSo&r=BkW1L6EF7ergAVYDXCo-3Vwkpy6qjsWAz7_GD7pAR8g&m=aecDGh6mOmdF13fznxqZ2eCSP--lTT02C2dNHJ4zckuqPG2gcb8ZMfwPqL8hjYrT&s=xL-Z7KyqXzMfcalEPIc9nMzaorgJ7s3cHH444pReL1c&e=>
> On 5/23/23 01:05, Talat Uyarer via dev wrote:
>
> Maybe the User list does not have knowledge about this. That's why I also
> resend on the Dev list. Sorry for cross posting
>
>
> Hi All,
>
> I have a stream aggregation job which reads from Kafka and writes some
> Sinks.
>
> When I submit my job Flink checkpoint size keeps increasing if I use
> unaligned checkpoint settings and it does not emit any window results.
> If I use an aligned checkpoint, size is somewhat under control(still big)
> but Checkpoint alignment takes a long time.
>
> I would like to implement something similar [1]. I believe
> if UnboundedSourceWrapper pause reading future watermark partitions it will
> reduce the size of the checkpoint and I can use unaligned checkpointing.
> What do you think about this approach ? Do you have another solution ?
>
> One more question: I was reading code to implement the above idea. I saw
> this code [2] Does Flink Runner have a similar implementation?
>
> Thanks
>
> [1] https://github.com/apache/flink/pull/11968
> <https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_apache_flink_pull_11968&d=DwMDaQ&c=V9IgWpI5PvzTw83UyHGVSoW3Uc1MFWe5J8PTfkrzVSo&r=BkW1L6EF7ergAVYDXCo-3Vwkpy6qjsWAz7_GD7pAR8g&m=aecDGh6mOmdF13fznxqZ2eCSP--lTT02C2dNHJ4zckuqPG2gcb8ZMfwPqL8hjYrT&s=Rb3yOAuXoya8Yo5IMdRYyxBpvWzJ3UmqhPUgc1WJdNs&e=>
> [2]
> https://github.com/apache/beam/blob/master/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java#L207
> <https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_apache_beam_blob_master_runners_flink_src_main_java_org_apache_beam_runners_flink_translation_wrappers_streaming_state_FlinkStateInternals.java-23L207&d=DwMDaQ&c=V9IgWpI5PvzTw83UyHGVSoW3Uc1MFWe5J8PTfkrzVSo&r=BkW1L6EF7ergAVYDXCo-3Vwkpy6qjsWAz7_GD7pAR8g&m=aecDGh6mOmdF13fznxqZ2eCSP--lTT02C2dNHJ4zckuqPG2gcb8ZMfwPqL8hjYrT&s=iyl4EcoO9Vtd-X9IxkJHUtgFaHEmUTyM__0qmkCIeQ4&e=>
>
>

Re: Watermark Alignment on Flink Runner's UnboundedSourceWrapper

Posted by Jan Lukavský <je...@seznam.cz>.
Hi Talat,

your analysis is correct, aligning watermarks for jobs with high 
watermark skew in input partitions really results in faster checkpoints 
and reduces the size of state. There are generally two places you can 
implement this - in user code (the source) or inside runner. The user 
code can use some external synchronization (e.g. ZooKeeper) to keep 
track of progress of all individual sources. Another option is to read 
the watermark from Flink's Rest API (some inspiration here [1]).

Another option would be to make use of [2] and implement this directly 
in FlinkRunner. I'm not familiar with any possible limitations of this, 
this was added to Flink quite recently (we would have to support this 
only when running on Flink 1.15+).

If you would like to go for the second approach, I'd be happy to help 
with some guidance.

Best,

  Jan

[1] 
https://github.com/O2-Czech-Republic/proxima-platform/blob/master/flink/utils/src/main/java/cz/o2/proxima/flink/utils/FlinkGlobalWatermarkTracker.java
[2] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-182%3A+Support+watermark+alignment+of+FLIP-27+Sources

On 5/23/23 01:05, Talat Uyarer via dev wrote:
> Maybe the User list does not have knowledge about this. That's why I 
> also resend on the Dev list. Sorry for cross posting
>
>
> Hi All,
>
> I have a stream aggregation job which reads from Kafka and writes some 
> Sinks.
>
> When I submit my job Flink checkpoint size keeps increasing if I use 
> unaligned checkpoint settings and it does not emit any window results.
> If I use an aligned checkpoint, size is somewhat under control(still 
> big) but Checkpoint alignment takes a long time.
>
> I would like to implement something similar [1]. I believe 
> if UnboundedSourceWrapper pause reading future watermark partitions it 
> will reduce the size of the checkpoint and I can use unaligned 
> checkpointing. What do you think about this approach ? Do you have 
> another solution ?
>
> One more question: I was reading code to implement the above idea. I 
> saw this code [2] Does Flink Runner have a similar implementation?
>
> Thanks
>
> [1] https://github.com/apache/flink/pull/11968
> [2] 
> https://github.com/apache/beam/blob/master/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java#L207

Watermark Alignment on Flink Runner's UnboundedSourceWrapper

Posted by Talat Uyarer via dev <de...@beam.apache.org>.
Maybe the User list does not have knowledge about this. That's why I also
resend on the Dev list. Sorry for cross posting


Hi All,

I have a stream aggregation job which reads from Kafka and writes some
Sinks.

When I submit my job Flink checkpoint size keeps increasing if I use
unaligned checkpoint settings and it does not emit any window results.
If I use an aligned checkpoint, size is somewhat under control(still big)
but Checkpoint alignment takes a long time.

I would like to implement something similar [1]. I believe
if UnboundedSourceWrapper pause reading future watermark partitions it will
reduce the size of the checkpoint and I can use unaligned checkpointing.
What do you think about this approach ? Do you have another solution ?

One more question: I was reading code to implement the above idea. I saw
this code [2] Does Flink Runner have a similar implementation?

Thanks

[1] https://github.com/apache/flink/pull/11968
[2]
https://github.com/apache/beam/blob/master/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java#L207