You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Alexis Sarda-Espinosa <al...@microfocus.com> on 2022/03/10 18:26:37 UTC

Interval join operator is not forwarding watermarks correctly

Hello,

I'm in the process of updating from Flink 1.11.3 to 1.14.3, and it seems the interval join in my pipeline is no longer working. More specifically, I have a sliding window after the interval join, and the window isn't firing. After many tests, I ended up creating a custom operator that extends IntervalJoinOperator and I overrode processWatermark1() and processWatermark2() to add logs and check when they are called. I can see that processWatermark1() isn't called.

For completeness, this is how I use my custom operator:

joinOperator = new CustomIntervalJoinOperator(...);

stream1.connect(stream2)
    .keyBy(selector1, selector2)
    .transform("Interval Join", TypeInformation.of(Pojo.class), joinOperator);

---

Some more information in case it's relevant:

- stream2 is obtained from a side output.
- both stream1 and stream2 have watermarks assigned by custom strategies. I also log watermark creation, and I can see that watermarks are indeed emitted as expected in both streams.

Strangely, my watermark strategies mark themselves idle if they don't receive new events after 10 minutes, and if I send some events and wait 10 minutes, processWatermark1() is called! On the other hand, if I continuously send events, it is never called.

Is this a known issue?

Regards,
Alexis.


RE: Interval join operator is not forwarding watermarks correctly

Posted by Alexis Sarda-Espinosa <al...@microfocus.com>.
Hi Dawid,

Thanks for the update, I also managed to work around it by adding another watermark assignment operator between the join and the window. I’ll have to see if it’s possible to assign watermarks at the source, but even if it is, I worry that the different partitions created by all my keyBy() steps would make it difficult for me to figure out which parts of my pipeline should be idle.

Regards,
Alexis.

Re: Interval join operator is not forwarding watermarks correctly

Posted by Dawid Wysakowicz <dw...@apache.org>.
Hi Alexis,

I tried looking into your example. First of all, so far, I've spent only 
a limited time looking at the WatermarkGenerator, and I have not 
thoroughly understood how it works. I'd discourage assigning watermarks 
anywhere in the middle of your pipeline. This is considered to be an 
anti pattern and at some point there were thoughts to remove such a 
possibility.


Having said that, indeed there is a bug in the 
TimestampsAndWatermarksOperator[1]. The issue is that 
WatermarkGenerators in the middle of a pipeline cut off upstream 
watermarks, however they do not cut off WatermarkStatuses. Therefore 
when you have a chain of generators G1, G2, G3 each of parallelism 2. It 
is also true that only a single operator receives records (because of 
the constant key) and therefore only a single subtask is intended to 
generate watermarks. Therefore what happens is that the generator G3 
starts with one subtask IDLE and one ACTIVE, but it receives ACTIVE 
status from the upstream generator G2, because of all the cross 
connections (keyBy) between operators. Therefore it marks both channels 
ACTIVE, but only a single one generates watermarks.


As a recommendation I'd suggest keeping the watermark generation just 
right after the source. If this is not possible, as a workaround before 
it is fixed in Flink, you need to cut off WatermarkStatuses somehow. You 
can do that either in a custom operator or by modifying the 
TimestampsAndWatermarksOperator.


Best,

Dawid


[1] https://issues.apache.org/jira/browse/FLINK-26708



On 15/03/2022 23:47, Alexis Sarda-Espinosa wrote:
> For completeness, this still happens with Flink 1.14.4
>
> Regards,
> Alexis.
>
> ------------------------------------------------------------------------
> *From:* Alexis Sarda-Espinosa <al...@microfocus.com>
> *Sent:* Friday, March 11, 2022 12:21 AM
> *To:* user@flink.apache.org <us...@flink.apache.org>
> *Cc:* pnowojski@apache.org <pn...@apache.org>
> *Subject:* Re: Interval join operator is not forwarding watermarks 
> correctly
> I think I managed to create a reproducible example [1], I think it's 
> due to the use of window + join + window. When I run the test, I never 
> see the print output, but if I uncomment part of the code in the 
> watermark generator to mark it as idle more quickly, it starts working 
> after a while.
>
> [1] https://github.com/asardaes/flink-interval-join-test
>
>
> Regards,
> Alexis.
>
> ------------------------------------------------------------------------
> *From:* Alexis Sarda-Espinosa <al...@microfocus.com>
> *Sent:* Thursday, March 10, 2022 7:47 PM
> *To:* user@flink.apache.org <us...@flink.apache.org>
> *Cc:* pnowojski@apache.org <pn...@apache.org>
> *Subject:* RE: Interval join operator is not forwarding watermarks 
> correctly
>
> I found [1] and [2], which are closed, but could be related?
>
> [1] https://issues.apache.org/jira/browse/FLINK-23698
>
> [2] https://issues.apache.org/jira/browse/FLINK-18934
>
> Regards,
>
> Alexis.
>
> *From:*Alexis Sarda-Espinosa <al...@microfocus.com>
> *Sent:* Donnerstag, 10. März 2022 19:27
> *To:* user@flink.apache.org
> *Subject:* Interval join operator is not forwarding watermarks correctly
>
> Hello,
>
> I’m in the process of updating from Flink 1.11.3 to 1.14.3, and it 
> seems the interval join in my pipeline is no longer working. More 
> specifically, I have a sliding window after the interval join, and the 
> window isn’t firing. After many tests, I ended up creating a custom 
> operator that extends IntervalJoinOperator and I overrode 
> processWatermark1() and processWatermark2() to add logs and check when 
> they are called. I can see that processWatermark1() isn’t called.
>
> For completeness, this is how I use my custom operator:
>
> joinOperator = new CustomIntervalJoinOperator(…);
>
> stream1.connect(stream2)
>
> .keyBy(selector1, selector2)
>
> .transform("Interval Join", TypeInformation.of(Pojo.class), joinOperator);
>
> ---
>
> Some more information in case it’s relevant:
>
> - stream2 is obtained from a side output.
>
> - both stream1 and stream2 have watermarks assigned by custom 
> strategies. I also log watermark creation, and I can see that 
> watermarks are indeed emitted as expected in both streams.
>
> Strangely, my watermark strategies mark themselves idle if they don’t 
> receive new events after 10 minutes, and if I send some events and 
> wait 10 minutes, processWatermark1() is called! On the other hand, if 
> I continuously send events, it is never called.
>
> Is this a known issue?
>
> Regards,
>
> Alexis.
>

Re: Interval join operator is not forwarding watermarks correctly

Posted by Alexis Sarda-Espinosa <al...@microfocus.com>.
For completeness, this still happens with Flink 1.14.4

Regards,
Alexis.

________________________________
From: Alexis Sarda-Espinosa <al...@microfocus.com>
Sent: Friday, March 11, 2022 12:21 AM
To: user@flink.apache.org <us...@flink.apache.org>
Cc: pnowojski@apache.org <pn...@apache.org>
Subject: Re: Interval join operator is not forwarding watermarks correctly

I think I managed to create a reproducible example [1], I think it's due to the use of window + join + window. When I run the test, I never see the print output, but if I uncomment part of the code in the watermark generator to mark it as idle more quickly, it starts working after a while.

[1] https://github.com/asardaes/flink-interval-join-test


Regards,
Alexis.

________________________________
From: Alexis Sarda-Espinosa <al...@microfocus.com>
Sent: Thursday, March 10, 2022 7:47 PM
To: user@flink.apache.org <us...@flink.apache.org>
Cc: pnowojski@apache.org <pn...@apache.org>
Subject: RE: Interval join operator is not forwarding watermarks correctly


I found [1] and [2], which are closed, but could be related?



[1] https://issues.apache.org/jira/browse/FLINK-23698

[2] https://issues.apache.org/jira/browse/FLINK-18934



Regards,

Alexis.



From: Alexis Sarda-Espinosa <al...@microfocus.com>
Sent: Donnerstag, 10. März 2022 19:27
To: user@flink.apache.org
Subject: Interval join operator is not forwarding watermarks correctly



Hello,



I’m in the process of updating from Flink 1.11.3 to 1.14.3, and it seems the interval join in my pipeline is no longer working. More specifically, I have a sliding window after the interval join, and the window isn’t firing. After many tests, I ended up creating a custom operator that extends IntervalJoinOperator and I overrode processWatermark1() and processWatermark2() to add logs and check when they are called. I can see that processWatermark1() isn’t called.



For completeness, this is how I use my custom operator:



joinOperator = new CustomIntervalJoinOperator(…);



stream1.connect(stream2)

    .keyBy(selector1, selector2)

    .transform("Interval Join", TypeInformation.of(Pojo.class), joinOperator);



---



Some more information in case it’s relevant:



- stream2 is obtained from a side output.

- both stream1 and stream2 have watermarks assigned by custom strategies. I also log watermark creation, and I can see that watermarks are indeed emitted as expected in both streams.



Strangely, my watermark strategies mark themselves idle if they don’t receive new events after 10 minutes, and if I send some events and wait 10 minutes, processWatermark1() is called! On the other hand, if I continuously send events, it is never called.



Is this a known issue?



Regards,

Alexis.



Re: Interval join operator is not forwarding watermarks correctly

Posted by Alexis Sarda-Espinosa <al...@microfocus.com>.
I think I managed to create a reproducible example [1], I think it's due to the use of window + join + window. When I run the test, I never see the print output, but if I uncomment part of the code in the watermark generator to mark it as idle more quickly, it starts working after a while.

[1] https://github.com/asardaes/flink-interval-join-test


Regards,
Alexis.

________________________________
From: Alexis Sarda-Espinosa <al...@microfocus.com>
Sent: Thursday, March 10, 2022 7:47 PM
To: user@flink.apache.org <us...@flink.apache.org>
Cc: pnowojski@apache.org <pn...@apache.org>
Subject: RE: Interval join operator is not forwarding watermarks correctly


I found [1] and [2], which are closed, but could be related?



[1] https://issues.apache.org/jira/browse/FLINK-23698

[2] https://issues.apache.org/jira/browse/FLINK-18934



Regards,

Alexis.



From: Alexis Sarda-Espinosa <al...@microfocus.com>
Sent: Donnerstag, 10. März 2022 19:27
To: user@flink.apache.org
Subject: Interval join operator is not forwarding watermarks correctly



Hello,



I’m in the process of updating from Flink 1.11.3 to 1.14.3, and it seems the interval join in my pipeline is no longer working. More specifically, I have a sliding window after the interval join, and the window isn’t firing. After many tests, I ended up creating a custom operator that extends IntervalJoinOperator and I overrode processWatermark1() and processWatermark2() to add logs and check when they are called. I can see that processWatermark1() isn’t called.



For completeness, this is how I use my custom operator:



joinOperator = new CustomIntervalJoinOperator(…);



stream1.connect(stream2)

    .keyBy(selector1, selector2)

    .transform("Interval Join", TypeInformation.of(Pojo.class), joinOperator);



---



Some more information in case it’s relevant:



- stream2 is obtained from a side output.

- both stream1 and stream2 have watermarks assigned by custom strategies. I also log watermark creation, and I can see that watermarks are indeed emitted as expected in both streams.



Strangely, my watermark strategies mark themselves idle if they don’t receive new events after 10 minutes, and if I send some events and wait 10 minutes, processWatermark1() is called! On the other hand, if I continuously send events, it is never called.



Is this a known issue?



Regards,

Alexis.



RE: Interval join operator is not forwarding watermarks correctly

Posted by Alexis Sarda-Espinosa <al...@microfocus.com>.
I found [1] and [2], which are closed, but could be related?

[1] https://issues.apache.org/jira/browse/FLINK-23698
[2] https://issues.apache.org/jira/browse/FLINK-18934

Regards,
Alexis.

From: Alexis Sarda-Espinosa <al...@microfocus.com>
Sent: Donnerstag, 10. März 2022 19:27
To: user@flink.apache.org
Subject: Interval join operator is not forwarding watermarks correctly

Hello,

I'm in the process of updating from Flink 1.11.3 to 1.14.3, and it seems the interval join in my pipeline is no longer working. More specifically, I have a sliding window after the interval join, and the window isn't firing. After many tests, I ended up creating a custom operator that extends IntervalJoinOperator and I overrode processWatermark1() and processWatermark2() to add logs and check when they are called. I can see that processWatermark1() isn't called.

For completeness, this is how I use my custom operator:

joinOperator = new CustomIntervalJoinOperator(...);

stream1.connect(stream2)
    .keyBy(selector1, selector2)
    .transform("Interval Join", TypeInformation.of(Pojo.class), joinOperator);

---

Some more information in case it's relevant:

- stream2 is obtained from a side output.
- both stream1 and stream2 have watermarks assigned by custom strategies. I also log watermark creation, and I can see that watermarks are indeed emitted as expected in both streams.

Strangely, my watermark strategies mark themselves idle if they don't receive new events after 10 minutes, and if I send some events and wait 10 minutes, processWatermark1() is called! On the other hand, if I continuously send events, it is never called.

Is this a known issue?

Regards,
Alexis.