You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Gen Luo <lu...@gmail.com> on 2023/03/02 08:32:11 UTC

Re: Fast and slow stream sources for Interval Join

Hi all,

A hacky but simple way I have tried is to chain a map function to the fast
source. The function checks the timestamp of the input message, if current
time - timestamp less than a certain value, sleeps for a while until the
message is late enough, then passes it through. Further messages should be
sent smoothly with only occasionally short sleeping. It works well if the
delay of the slow source is relatively stable.

On Tue, Feb 28, 2023 at 5:24 PM Schwalbe Matthias <
Matthias.Schwalbe@viseca.ch> wrote:

> Hi All,
>
>
>
> Another option to consider (and this is more a question 😊) is to
>
>    - Implement org.apache.flink.streaming.api.operators.InputSelectable
>    in the join operator
>    - And manually control backpressure on the inputs running ahead of
>    watermark time
>
>
>
> I’m not sure where actually to implement this and if it would work … just
> an idea.
>
> As also said for the watermark aligning, you would still need state to
> buffer fast events, but not as much as in the unaligned case.
>
> If this works you could control backpressure and watermarking for a single
> operator without forcing the whole job to adopt aligned watermarks.
>
>
>
> What do you think?
>
>
>
> Regards
>
>
>
> Thias
>
>
>
>
>
> *From:* Alexis Sarda-Espinosa <sa...@gmail.com>
> *Sent:* Tuesday, February 28, 2023 7:57 AM
> *To:* Mason Chen <ma...@gmail.com>
> *Cc:* Remigiusz Janeczek <ca...@gmail.com>; user <us...@flink.apache.org>
> *Subject:* Re: Fast and slow stream sources for Interval Join
>
>
>
> ⚠*EXTERNAL MESSAGE – **CAUTION: Think Before You Click *⚠
>
>
>
> Hi Mason,
>
>
>
> Very interesting, is it possible to apply both types of alignment? I.e.,
> considering watermark skew across splits from within one source & also from
> another source?
>
>
>
> Regards,
>
> Alexis.
>
>
>
> On Tue, 28 Feb 2023, 05:26 Mason Chen, <ma...@gmail.com> wrote:
>
> Hi all,
>
>
>
> It's true that the problem can be handled by caching records in state.
> However, there is an alternative using `watermark alignment` with Flink
> 1.15+ [1] which does the desired synchronization that you described while
> reducing the size of state from the former approach.
>
>
>
> To use this with two topics of different speeds, you would need to define
> two Kafka sources, each corresponding to a topic. This limitation is
> documented in [1]. This limitation is resolved in Flink 1.17 by split level
> (partition level in the case of Kafka) watermark alignment, so one Kafka
> source reading various topics can align on the partitions of the different
> topics.
>
>
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/datastream/event-time/generating_watermarks/#watermark-alignment-_beta_
>
>
>
> Best,
>
> Mason
>
>
>
> On Mon, Feb 27, 2023 at 8:11 AM Alexis Sarda-Espinosa <
> sarda.espinosa@gmail.com> wrote:
>
> Hello,
>
>
>
> I had this question myself and I've seen it a few times, the answer is
> always the same, there's currently no official way to handle it without
> state.
>
>
>
> Regards,
>
> Alexis.
>
>
>
> On Mon, 27 Feb 2023, 14:09 Remigiusz Janeczek, <ca...@gmail.com> wrote:
>
> Hi,
>
> How to handle a case where one of the Kafka topics used for interval join
> is slower than the other? (Or a case where one topic lags behind)
>
> Is there a way to stop consuming from the fast topic and wait for the slow
> one to catch up? I want to avoid running out of memory (or keeping a very
> large state) and I don't want to discard any data from the fast topic until
> a watermark from the slow topic allows that.
>
>
>
> Best Regards
>
> Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und
> beinhaltet unter Umständen vertrauliche Mitteilungen. Da die
> Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann,
> übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und
> Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir
> Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie
> eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung
> dieser Informationen ist streng verboten.
>
> This message is intended only for the named recipient and may contain
> confidential or privileged information. As the confidentiality of email
> communication cannot be guaranteed, we do not accept any responsibility for
> the confidentiality and the intactness of this message. If you have
> received it in error, please advise the sender by return e-mail and delete
> this message and any attachments. Any unauthorised use or dissemination of
> this information is strictly prohibited.
>