You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Remigiusz Janeczek <ca...@gmail.com> on 2023/02/27 13:09:40 UTC

Fast and slow stream sources for Interval Join

 Hi,

How to handle a case where one of the Kafka topics used for interval join
is slower than the other? (Or a case where one topic lags behind)
Is there a way to stop consuming from the fast topic and wait for the slow
one to catch up? I want to avoid running out of memory (or keeping a very
large state) and I don't want to discard any data from the fast topic until
a watermark from the slow topic allows that.

Best Regards

Re: Fast and slow stream sources for Interval Join

Posted by Gen Luo <lu...@gmail.com>.
Hi all,

A hacky but simple way I have tried is to chain a map function to the fast
source. The function checks the timestamp of the input message, if current
time - timestamp less than a certain value, sleeps for a while until the
message is late enough, then passes it through. Further messages should be
sent smoothly with only occasionally short sleeping. It works well if the
delay of the slow source is relatively stable.

On Tue, Feb 28, 2023 at 5:24 PM Schwalbe Matthias <
Matthias.Schwalbe@viseca.ch> wrote:

> Hi All,
>
>
>
> Another option to consider (and this is more a question 😊) is to
>
>    - Implement org.apache.flink.streaming.api.operators.InputSelectable
>    in the join operator
>    - And manually control backpressure on the inputs running ahead of
>    watermark time
>
>
>
> I’m not sure where actually to implement this and if it would work … just
> an idea.
>
> As also said for the watermark aligning, you would still need state to
> buffer fast events, but not as much as in the unaligned case.
>
> If this works you could control backpressure and watermarking for a single
> operator without forcing the whole job to adopt aligned watermarks.
>
>
>
> What do you think?
>
>
>
> Regards
>
>
>
> Thias
>
>
>
>
>
> *From:* Alexis Sarda-Espinosa <sa...@gmail.com>
> *Sent:* Tuesday, February 28, 2023 7:57 AM
> *To:* Mason Chen <ma...@gmail.com>
> *Cc:* Remigiusz Janeczek <ca...@gmail.com>; user <us...@flink.apache.org>
> *Subject:* Re: Fast and slow stream sources for Interval Join
>
>
>
> ⚠*EXTERNAL MESSAGE – **CAUTION: Think Before You Click *⚠
>
>
>
> Hi Mason,
>
>
>
> Very interesting, is it possible to apply both types of alignment? I.e.,
> considering watermark skew across splits from within one source & also from
> another source?
>
>
>
> Regards,
>
> Alexis.
>
>
>
> On Tue, 28 Feb 2023, 05:26 Mason Chen, <ma...@gmail.com> wrote:
>
> Hi all,
>
>
>
> It's true that the problem can be handled by caching records in state.
> However, there is an alternative using `watermark alignment` with Flink
> 1.15+ [1] which does the desired synchronization that you described while
> reducing the size of state from the former approach.
>
>
>
> To use this with two topics of different speeds, you would need to define
> two Kafka sources, each corresponding to a topic. This limitation is
> documented in [1]. This limitation is resolved in Flink 1.17 by split level
> (partition level in the case of Kafka) watermark alignment, so one Kafka
> source reading various topics can align on the partitions of the different
> topics.
>
>
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/datastream/event-time/generating_watermarks/#watermark-alignment-_beta_
>
>
>
> Best,
>
> Mason
>
>
>
> On Mon, Feb 27, 2023 at 8:11 AM Alexis Sarda-Espinosa <
> sarda.espinosa@gmail.com> wrote:
>
> Hello,
>
>
>
> I had this question myself and I've seen it a few times, the answer is
> always the same, there's currently no official way to handle it without
> state.
>
>
>
> Regards,
>
> Alexis.
>
>
>
> On Mon, 27 Feb 2023, 14:09 Remigiusz Janeczek, <ca...@gmail.com> wrote:
>
> Hi,
>
> How to handle a case where one of the Kafka topics used for interval join
> is slower than the other? (Or a case where one topic lags behind)
>
> Is there a way to stop consuming from the fast topic and wait for the slow
> one to catch up? I want to avoid running out of memory (or keeping a very
> large state) and I don't want to discard any data from the fast topic until
> a watermark from the slow topic allows that.
>
>
>
> Best Regards
>
> Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und
> beinhaltet unter Umständen vertrauliche Mitteilungen. Da die
> Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann,
> übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und
> Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir
> Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie
> eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung
> dieser Informationen ist streng verboten.
>
> This message is intended only for the named recipient and may contain
> confidential or privileged information. As the confidentiality of email
> communication cannot be guaranteed, we do not accept any responsibility for
> the confidentiality and the intactness of this message. If you have
> received it in error, please advise the sender by return e-mail and delete
> this message and any attachments. Any unauthorised use or dissemination of
> this information is strictly prohibited.
>

RE: Fast and slow stream sources for Interval Join

Posted by Schwalbe Matthias <Ma...@viseca.ch>.
Hi All,

Another option to consider (and this is more a question 😊) is to

  *   Implement org.apache.flink.streaming.api.operators.InputSelectable in the join operator
  *   And manually control backpressure on the inputs running ahead of watermark time

I’m not sure where actually to implement this and if it would work … just an idea.
As also said for the watermark aligning, you would still need state to buffer fast events, but not as much as in the unaligned case.
If this works you could control backpressure and watermarking for a single operator without forcing the whole job to adopt aligned watermarks.

What do you think?

Regards

Thias


From: Alexis Sarda-Espinosa <sa...@gmail.com>
Sent: Tuesday, February 28, 2023 7:57 AM
To: Mason Chen <ma...@gmail.com>
Cc: Remigiusz Janeczek <ca...@gmail.com>; user <us...@flink.apache.org>
Subject: Re: Fast and slow stream sources for Interval Join

⚠EXTERNAL MESSAGE – CAUTION: Think Before You Click ⚠


Hi Mason,

Very interesting, is it possible to apply both types of alignment? I.e., considering watermark skew across splits from within one source & also from another source?

Regards,
Alexis.

On Tue, 28 Feb 2023, 05:26 Mason Chen, <ma...@gmail.com>> wrote:
Hi all,

It's true that the problem can be handled by caching records in state. However, there is an alternative using `watermark alignment` with Flink 1.15+ [1] which does the desired synchronization that you described while reducing the size of state from the former approach.

To use this with two topics of different speeds, you would need to define two Kafka sources, each corresponding to a topic. This limitation is documented in [1]. This limitation is resolved in Flink 1.17 by split level (partition level in the case of Kafka) watermark alignment, so one Kafka source reading various topics can align on the partitions of the different topics.

[1] https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/datastream/event-time/generating_watermarks/#watermark-alignment-_beta_

Best,
Mason

On Mon, Feb 27, 2023 at 8:11 AM Alexis Sarda-Espinosa <sa...@gmail.com>> wrote:
Hello,

I had this question myself and I've seen it a few times, the answer is always the same, there's currently no official way to handle it without state.

Regards,
Alexis.

On Mon, 27 Feb 2023, 14:09 Remigiusz Janeczek, <ca...@gmail.com>> wrote:
Hi,
How to handle a case where one of the Kafka topics used for interval join is slower than the other? (Or a case where one topic lags behind)
Is there a way to stop consuming from the fast topic and wait for the slow one to catch up? I want to avoid running out of memory (or keeping a very large state) and I don't want to discard any data from the fast topic until a watermark from the slow topic allows that.

Best Regards
Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und beinhaltet unter Umständen vertrauliche Mitteilungen. Da die Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann, übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung dieser Informationen ist streng verboten.

This message is intended only for the named recipient and may contain confidential or privileged information. As the confidentiality of email communication cannot be guaranteed, we do not accept any responsibility for the confidentiality and the intactness of this message. If you have received it in error, please advise the sender by return e-mail and delete this message and any attachments. Any unauthorised use or dissemination of this information is strictly prohibited.

Re: Fast and slow stream sources for Interval Join

Posted by Alexis Sarda-Espinosa <sa...@gmail.com>.
Hi Mason,

Very interesting, is it possible to apply both types of alignment? I.e.,
considering watermark skew across splits from within one source & also from
another source?

Regards,
Alexis.

On Tue, 28 Feb 2023, 05:26 Mason Chen, <ma...@gmail.com> wrote:

> Hi all,
>
> It's true that the problem can be handled by caching records in state.
> However, there is an alternative using `watermark alignment` with Flink
> 1.15+ [1] which does the desired synchronization that you described while
> reducing the size of state from the former approach.
>
> To use this with two topics of different speeds, you would need to define
> two Kafka sources, each corresponding to a topic. This limitation is
> documented in [1]. This limitation is resolved in Flink 1.17 by split level
> (partition level in the case of Kafka) watermark alignment, so one Kafka
> source reading various topics can align on the partitions of the different
> topics.
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/datastream/event-time/generating_watermarks/#watermark-alignment-_beta_
>
> Best,
> Mason
>
> On Mon, Feb 27, 2023 at 8:11 AM Alexis Sarda-Espinosa <
> sarda.espinosa@gmail.com> wrote:
>
>> Hello,
>>
>> I had this question myself and I've seen it a few times, the answer is
>> always the same, there's currently no official way to handle it without
>> state.
>>
>> Regards,
>> Alexis.
>>
>> On Mon, 27 Feb 2023, 14:09 Remigiusz Janeczek, <ca...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> How to handle a case where one of the Kafka topics used for interval
>>> join is slower than the other? (Or a case where one topic lags behind)
>>> Is there a way to stop consuming from the fast topic and wait for the
>>> slow one to catch up? I want to avoid running out of memory (or keeping a
>>> very large state) and I don't want to discard any data from the fast topic
>>> until a watermark from the slow topic allows that.
>>>
>>> Best Regards
>>>
>>

Re: Fast and slow stream sources for Interval Join

Posted by Mason Chen <ma...@gmail.com>.
Hi all,

It's true that the problem can be handled by caching records in state.
However, there is an alternative using `watermark alignment` with Flink
1.15+ [1] which does the desired synchronization that you described while
reducing the size of state from the former approach.

To use this with two topics of different speeds, you would need to define
two Kafka sources, each corresponding to a topic. This limitation is
documented in [1]. This limitation is resolved in Flink 1.17 by split level
(partition level in the case of Kafka) watermark alignment, so one Kafka
source reading various topics can align on the partitions of the different
topics.

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/datastream/event-time/generating_watermarks/#watermark-alignment-_beta_

Best,
Mason

On Mon, Feb 27, 2023 at 8:11 AM Alexis Sarda-Espinosa <
sarda.espinosa@gmail.com> wrote:

> Hello,
>
> I had this question myself and I've seen it a few times, the answer is
> always the same, there's currently no official way to handle it without
> state.
>
> Regards,
> Alexis.
>
> On Mon, 27 Feb 2023, 14:09 Remigiusz Janeczek, <ca...@gmail.com> wrote:
>
>> Hi,
>>
>> How to handle a case where one of the Kafka topics used for interval join
>> is slower than the other? (Or a case where one topic lags behind)
>> Is there a way to stop consuming from the fast topic and wait for the
>> slow one to catch up? I want to avoid running out of memory (or keeping a
>> very large state) and I don't want to discard any data from the fast topic
>> until a watermark from the slow topic allows that.
>>
>> Best Regards
>>
>

Re: Fast and slow stream sources for Interval Join

Posted by Alexis Sarda-Espinosa <sa...@gmail.com>.
Hello,

I had this question myself and I've seen it a few times, the answer is
always the same, there's currently no official way to handle it without
state.

Regards,
Alexis.

On Mon, 27 Feb 2023, 14:09 Remigiusz Janeczek, <ca...@gmail.com> wrote:

> Hi,
>
> How to handle a case where one of the Kafka topics used for interval join
> is slower than the other? (Or a case where one topic lags behind)
> Is there a way to stop consuming from the fast topic and wait for the slow
> one to catch up? I want to avoid running out of memory (or keeping a very
> large state) and I don't want to discard any data from the fast topic until
> a watermark from the slow topic allows that.
>
> Best Regards
>