You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by Xuannan Su <su...@gmail.com> on 2023/06/25 07:02:00 UTC

[DISCUSS] FLIP-326: Enhance Watermark to Support Processing-Time Temporal Join

Hi all,

Dong(cc'ed) and I are opening this thread to discuss our proposal to
enhance the watermark to properly support processing-time temporal
join, which has been documented in FLIP-326 [1].

We want to support the use case where the records from the probe side
of the processing-time temporal join need to wait until the build side
finishes the snapshot phrase by enhancing the expressiveness of the
Watermark. Additionally, these changes lay the groundwork for
simplifying the DataStream APIs, eliminating the need for users to
explicitly differentiate between event-time and processing-time,
resulting in a more intuitive user experience.

Please refer to the FLIP document for more details about the proposed
design and implementation. We welcome any feedback and opinions on
this proposal.

Best regards,

Dong and Xuannan

[1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-326%3A+Enhance+Watermark+to+Support+Processing-Time+Temporal+Join

Re: [DISCUSS] FLIP-326: Enhance Watermark to Support Processing-Time Temporal Join

Posted by Jing Ge <ji...@ververica.com.INVALID>.
Hi Dong,

Thanks for your proposal. It is a very interesting feature and also a
complex one. Especially the "shotgun surgery"[1] of handling the
useProcessingTime logic. While reading the FLIP, I was wondering if it
possible to leverage the Visitor design pattern(not exactly use the pattern
directly) to encapsulate the logic centrally in the Watermark or the
Visitors, i.e. high cohesion?

Best regards,
Jing


[1] https://en.wikipedia.org/wiki/Shotgun_surgery


On Wed, Jul 26, 2023 at 1:46 AM David Anderson <da...@apache.org> wrote:

> Dong,
>
> Thank you for the careful analysis of my proposal. Your conclusions
> make sense to me.
>
> David
>
> On Mon, Jul 24, 2023 at 8:37 PM Dong Lin <li...@gmail.com> wrote:
> >
> > Hi David,
> >
> > Thank you for the detailed comments and the suggestion of this
> alternative approach.
> >
> > I agree with you that this alternative can also address the target
> use-case with the same correctness. In comparison to the current FLIP, this
> alternative indeed introduces much less complexity to the Flink runtime
> internal implementation.
> >
> > At a high level, this alternative is simulating a one-time emission of
> Watermark(useProcessingTime=true) with periodic emission of
> Watermark(timestamp=wall-lock-time).
> >
> > One downside of this alternative is that it can introduce a bit of extra
> per-record runtime overhead. This is because the ingestion time watermark
> will be emitted periodically according to pipeline.auto-watermark-interval
> (200 ms by default). Thus there is still a short period where the watermark
> from the HybridSource can be lagging behind wall-clock time. For operators
> whose logic depends on the watermark, such as TemporalRowTimeJoinOperator,
> they will need to check build-side watermark and delay/buffer records on
> the probe-side until it receives the next ingestion-time watermark.
> >
> > The impact of this overhead probably depends on the throughput/watermark
> of the probe-side records. On the other hand, given that join operator is
> typically already heavy (due to state backend access and build-side
> buffer), and the watermark from probe-side (e.g. Kafka) is probably also
> lagging behind wall-clock time, it is probably not an issue in most cases.
> Therefore I agree that it is worth trying this approach. We can revisit
> this issue if we any issues around performance or usability of this
> approach.
> >
> > Another potential concern is that it requires the user to use ingestion
> time. I am not sure we are able to do this in a backward-compatible way
> yet. We probably need to go through the existing APIs around ingestion time
> watermark to validate this.
> >
> > BTW, with the introduction of RecordAttributes(isBacklog=true/false)
> from FLIP-327, another short-term approach is to let
> TemporalProcessTimeJoinOperator keep buffering records from
> MySQL/HybridSource as long as isBacklog=true, and process them in a
> processing-time manner once it receives isBacklog=false. This should also
> address the use-case targeted by FLIP-326. The only caveat with this
> approach is that it is a bit hacky, because it requires JoinOpertor to
> always buffer records when isBacklog=true, whereas isBacklog's semantics
> only says it is "optional" to buffer records, which can be an issue in the
> long term.
> >
> > Thanks,
> > Dong
> >
> > On Tue, Jul 25, 2023 at 2:37 AM David Anderson <da...@apache.org>
> wrote:
> >>
> >> I'm delighted to see interest in developing support for
> >> processing-time temporal joins.
> >>
> >> The proposed implementation seems rather complex, and I'm not
> >> convinced this complexity is justified/necessary. I'd like to outline
> >> a simpler alternative that I think would satisfy the key objectives.
> >>
> >> Key ideas:
> >>
> >> 1. Limit support to the HybridSource (or a derivative thereof). (E.g.,
> >> I'm guessing the MySQL CDC Source could be reworked to be a hybrid
> >> source.)
> >> 2. Have this HybridSource wait to begin emitting watermarks until it
> >> has handled all events from the bounded sources. (I'm not sure how the
> >> HybridSource handles this now; if this is an incompatible change, we
> >> can find a way to deal with that.)
> >> 3. Instruct users to use an ingestion time watermarking strategy for
> >> their unbounded source (the source the HybridSource handles last) if
> >> they want to do something like a processing time temporal join.
> >>
> >> One objection to this is the limitation of only supporting the
> >> HybridSource -- what about cases where the user has a single source,
> >> e.g., a Kafka topic? I'm suggesting the user would divide their
> >> build-side stream into two parts -- a bounded component that is fully
> >> ingested by the hybrid source before watermarking begins, followed by
> >> an unbounded component.
> >>
> >> I think this alternative handles use cases like processing-time
> >> temporal join rather nicely, without requiring any changes to
> >> watermarks or the core runtime.
> >>
> >> David
> >>
> >> On Thu, Jun 29, 2023 at 1:39 AM Martijn Visser <
> martijnvisser@apache.org> wrote:
> >> >
> >> > Hi Dong and Xuannan,
> >> >
> >> > I'm excited to see this FLIP. I think support for processing-time
> >> > temporal joins is something that the Flink users will greatly benefit
> >> > off. I specifically want to call-out that it's great to see the use
> >> > cases that this enables. From a technical implementation perspective,
> >> > I defer to the opinion of others with expertise on this topic.
> >> >
> >> > Best regards,
> >> >
> >> > Martijn
> >> >
> >> > On Sun, Jun 25, 2023 at 9:03 AM Xuannan Su <su...@gmail.com>
> wrote:
> >> > >
> >> > > Hi all,
> >> > >
> >> > > Dong(cc'ed) and I are opening this thread to discuss our proposal to
> >> > > enhance the watermark to properly support processing-time temporal
> >> > > join, which has been documented in FLIP-326 [1].
> >> > >
> >> > > We want to support the use case where the records from the probe
> side
> >> > > of the processing-time temporal join need to wait until the build
> side
> >> > > finishes the snapshot phrase by enhancing the expressiveness of the
> >> > > Watermark. Additionally, these changes lay the groundwork for
> >> > > simplifying the DataStream APIs, eliminating the need for users to
> >> > > explicitly differentiate between event-time and processing-time,
> >> > > resulting in a more intuitive user experience.
> >> > >
> >> > > Please refer to the FLIP document for more details about the
> proposed
> >> > > design and implementation. We welcome any feedback and opinions on
> >> > > this proposal.
> >> > >
> >> > > Best regards,
> >> > >
> >> > > Dong and Xuannan
> >> > >
> >> > > [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-326%3A+Enhance+Watermark+to+Support+Processing-Time+Temporal+Join
>

Re: [DISCUSS] FLIP-326: Enhance Watermark to Support Processing-Time Temporal Join

Posted by David Anderson <da...@apache.org>.
Dong,

Thank you for the careful analysis of my proposal. Your conclusions
make sense to me.

David

On Mon, Jul 24, 2023 at 8:37 PM Dong Lin <li...@gmail.com> wrote:
>
> Hi David,
>
> Thank you for the detailed comments and the suggestion of this alternative approach.
>
> I agree with you that this alternative can also address the target use-case with the same correctness. In comparison to the current FLIP, this alternative indeed introduces much less complexity to the Flink runtime internal implementation.
>
> At a high level, this alternative is simulating a one-time emission of Watermark(useProcessingTime=true) with periodic emission of Watermark(timestamp=wall-lock-time).
>
> One downside of this alternative is that it can introduce a bit of extra per-record runtime overhead. This is because the ingestion time watermark will be emitted periodically according to pipeline.auto-watermark-interval (200 ms by default). Thus there is still a short period where the watermark from the HybridSource can be lagging behind wall-clock time. For operators whose logic depends on the watermark, such as TemporalRowTimeJoinOperator, they will need to check build-side watermark and delay/buffer records on the probe-side until it receives the next ingestion-time watermark.
>
> The impact of this overhead probably depends on the throughput/watermark of the probe-side records. On the other hand, given that join operator is typically already heavy (due to state backend access and build-side buffer), and the watermark from probe-side (e.g. Kafka) is probably also lagging behind wall-clock time, it is probably not an issue in most cases. Therefore I agree that it is worth trying this approach. We can revisit this issue if we any issues around performance or usability of this approach.
>
> Another potential concern is that it requires the user to use ingestion time. I am not sure we are able to do this in a backward-compatible way yet. We probably need to go through the existing APIs around ingestion time watermark to validate this.
>
> BTW, with the introduction of RecordAttributes(isBacklog=true/false) from FLIP-327, another short-term approach is to let TemporalProcessTimeJoinOperator keep buffering records from MySQL/HybridSource as long as isBacklog=true, and process them in a processing-time manner once it receives isBacklog=false. This should also address the use-case targeted by FLIP-326. The only caveat with this approach is that it is a bit hacky, because it requires JoinOpertor to always buffer records when isBacklog=true, whereas isBacklog's semantics only says it is "optional" to buffer records, which can be an issue in the long term.
>
> Thanks,
> Dong
>
> On Tue, Jul 25, 2023 at 2:37 AM David Anderson <da...@apache.org> wrote:
>>
>> I'm delighted to see interest in developing support for
>> processing-time temporal joins.
>>
>> The proposed implementation seems rather complex, and I'm not
>> convinced this complexity is justified/necessary. I'd like to outline
>> a simpler alternative that I think would satisfy the key objectives.
>>
>> Key ideas:
>>
>> 1. Limit support to the HybridSource (or a derivative thereof). (E.g.,
>> I'm guessing the MySQL CDC Source could be reworked to be a hybrid
>> source.)
>> 2. Have this HybridSource wait to begin emitting watermarks until it
>> has handled all events from the bounded sources. (I'm not sure how the
>> HybridSource handles this now; if this is an incompatible change, we
>> can find a way to deal with that.)
>> 3. Instruct users to use an ingestion time watermarking strategy for
>> their unbounded source (the source the HybridSource handles last) if
>> they want to do something like a processing time temporal join.
>>
>> One objection to this is the limitation of only supporting the
>> HybridSource -- what about cases where the user has a single source,
>> e.g., a Kafka topic? I'm suggesting the user would divide their
>> build-side stream into two parts -- a bounded component that is fully
>> ingested by the hybrid source before watermarking begins, followed by
>> an unbounded component.
>>
>> I think this alternative handles use cases like processing-time
>> temporal join rather nicely, without requiring any changes to
>> watermarks or the core runtime.
>>
>> David
>>
>> On Thu, Jun 29, 2023 at 1:39 AM Martijn Visser <ma...@apache.org> wrote:
>> >
>> > Hi Dong and Xuannan,
>> >
>> > I'm excited to see this FLIP. I think support for processing-time
>> > temporal joins is something that the Flink users will greatly benefit
>> > off. I specifically want to call-out that it's great to see the use
>> > cases that this enables. From a technical implementation perspective,
>> > I defer to the opinion of others with expertise on this topic.
>> >
>> > Best regards,
>> >
>> > Martijn
>> >
>> > On Sun, Jun 25, 2023 at 9:03 AM Xuannan Su <su...@gmail.com> wrote:
>> > >
>> > > Hi all,
>> > >
>> > > Dong(cc'ed) and I are opening this thread to discuss our proposal to
>> > > enhance the watermark to properly support processing-time temporal
>> > > join, which has been documented in FLIP-326 [1].
>> > >
>> > > We want to support the use case where the records from the probe side
>> > > of the processing-time temporal join need to wait until the build side
>> > > finishes the snapshot phrase by enhancing the expressiveness of the
>> > > Watermark. Additionally, these changes lay the groundwork for
>> > > simplifying the DataStream APIs, eliminating the need for users to
>> > > explicitly differentiate between event-time and processing-time,
>> > > resulting in a more intuitive user experience.
>> > >
>> > > Please refer to the FLIP document for more details about the proposed
>> > > design and implementation. We welcome any feedback and opinions on
>> > > this proposal.
>> > >
>> > > Best regards,
>> > >
>> > > Dong and Xuannan
>> > >
>> > > [1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-326%3A+Enhance+Watermark+to+Support+Processing-Time+Temporal+Join

Re: [DISCUSS] FLIP-326: Enhance Watermark to Support Processing-Time Temporal Join

Posted by Dong Lin <li...@gmail.com>.
Hi David,

Thank you for the detailed comments and the suggestion of this alternative
approach.

I agree with you that this alternative can also address the target use-case
with the same correctness. In comparison to the current FLIP, this
alternative indeed introduces much less complexity to the Flink runtime
internal implementation.

At a high level, this alternative is simulating a one-time emission of
Watermark(useProcessingTime=true) with periodic emission of
Watermark(timestamp=wall-lock-time).

One downside of this alternative is that it can introduce a bit of extra
per-record runtime overhead. This is because the ingestion time watermark
will be emitted periodically according to pipeline.auto-watermark-interval
(200 ms by default). Thus there is still a short period where the watermark
from the HybridSource can be lagging behind wall-clock time. For operators
whose logic depends on the watermark, such as TemporalRowTimeJoinOperator,
they will need to check build-side watermark and delay/buffer records on
the probe-side until it receives the next ingestion-time watermark.

The impact of this overhead probably depends on the throughput/watermark of
the probe-side records. On the other hand, given that join operator is
typically already heavy (due to state backend access and build-side
buffer), and the watermark from probe-side (e.g. Kafka) is probably also
lagging behind wall-clock time, it is probably not an issue in most cases.
Therefore I agree that it is worth trying this approach. We can revisit
this issue if we any issues around performance or usability of this
approach.

Another potential concern is that it requires the user to use ingestion
time. I am not sure we are able to do this in a backward-compatible way
yet. We probably need to go through the existing APIs around ingestion time
watermark to validate this.

BTW, with the introduction of RecordAttributes(isBacklog=true/false) from
FLIP-327
<https://cwiki.apache.org/confluence/display/FLINK/FLIP-327%3A+Support+switching+from+batch+to+stream+mode+to+improve+throughput+when+processing+backlog+data>,
another short-term approach is to let TemporalProcessTimeJoinOperator keep
buffering records from MySQL/HybridSource as long as isBacklog=true, and
process them in a processing-time manner once it receives isBacklog=false.
This should also address the use-case targeted by FLIP-326. The only caveat
with this approach is that it is a bit hacky, because it requires
JoinOpertor to always buffer records when isBacklog=true, whereas
isBacklog's semantics only says it is "optional" to buffer records, which
can be an issue in the long term.

Thanks,
Dong

On Tue, Jul 25, 2023 at 2:37 AM David Anderson <da...@apache.org> wrote:

> I'm delighted to see interest in developing support for
> processing-time temporal joins.
>
> The proposed implementation seems rather complex, and I'm not
> convinced this complexity is justified/necessary. I'd like to outline
> a simpler alternative that I think would satisfy the key objectives.
>
> Key ideas:
>
> 1. Limit support to the HybridSource (or a derivative thereof). (E.g.,
> I'm guessing the MySQL CDC Source could be reworked to be a hybrid
> source.)
> 2. Have this HybridSource wait to begin emitting watermarks until it
> has handled all events from the bounded sources. (I'm not sure how the
> HybridSource handles this now; if this is an incompatible change, we
> can find a way to deal with that.)
> 3. Instruct users to use an ingestion time watermarking strategy for
> their unbounded source (the source the HybridSource handles last) if
> they want to do something like a processing time temporal join.
>
> One objection to this is the limitation of only supporting the
> HybridSource -- what about cases where the user has a single source,
> e.g., a Kafka topic? I'm suggesting the user would divide their
> build-side stream into two parts -- a bounded component that is fully
> ingested by the hybrid source before watermarking begins, followed by
> an unbounded component.
>
> I think this alternative handles use cases like processing-time
> temporal join rather nicely, without requiring any changes to
> watermarks or the core runtime.
>
> David
>
> On Thu, Jun 29, 2023 at 1:39 AM Martijn Visser <ma...@apache.org>
> wrote:
> >
> > Hi Dong and Xuannan,
> >
> > I'm excited to see this FLIP. I think support for processing-time
> > temporal joins is something that the Flink users will greatly benefit
> > off. I specifically want to call-out that it's great to see the use
> > cases that this enables. From a technical implementation perspective,
> > I defer to the opinion of others with expertise on this topic.
> >
> > Best regards,
> >
> > Martijn
> >
> > On Sun, Jun 25, 2023 at 9:03 AM Xuannan Su <su...@gmail.com>
> wrote:
> > >
> > > Hi all,
> > >
> > > Dong(cc'ed) and I are opening this thread to discuss our proposal to
> > > enhance the watermark to properly support processing-time temporal
> > > join, which has been documented in FLIP-326 [1].
> > >
> > > We want to support the use case where the records from the probe side
> > > of the processing-time temporal join need to wait until the build side
> > > finishes the snapshot phrase by enhancing the expressiveness of the
> > > Watermark. Additionally, these changes lay the groundwork for
> > > simplifying the DataStream APIs, eliminating the need for users to
> > > explicitly differentiate between event-time and processing-time,
> > > resulting in a more intuitive user experience.
> > >
> > > Please refer to the FLIP document for more details about the proposed
> > > design and implementation. We welcome any feedback and opinions on
> > > this proposal.
> > >
> > > Best regards,
> > >
> > > Dong and Xuannan
> > >
> > > [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-326%3A+Enhance+Watermark+to+Support+Processing-Time+Temporal+Join
>

Re: [DISCUSS] FLIP-326: Enhance Watermark to Support Processing-Time Temporal Join

Posted by David Anderson <da...@apache.org>.
I'm delighted to see interest in developing support for
processing-time temporal joins.

The proposed implementation seems rather complex, and I'm not
convinced this complexity is justified/necessary. I'd like to outline
a simpler alternative that I think would satisfy the key objectives.

Key ideas:

1. Limit support to the HybridSource (or a derivative thereof). (E.g.,
I'm guessing the MySQL CDC Source could be reworked to be a hybrid
source.)
2. Have this HybridSource wait to begin emitting watermarks until it
has handled all events from the bounded sources. (I'm not sure how the
HybridSource handles this now; if this is an incompatible change, we
can find a way to deal with that.)
3. Instruct users to use an ingestion time watermarking strategy for
their unbounded source (the source the HybridSource handles last) if
they want to do something like a processing time temporal join.

One objection to this is the limitation of only supporting the
HybridSource -- what about cases where the user has a single source,
e.g., a Kafka topic? I'm suggesting the user would divide their
build-side stream into two parts -- a bounded component that is fully
ingested by the hybrid source before watermarking begins, followed by
an unbounded component.

I think this alternative handles use cases like processing-time
temporal join rather nicely, without requiring any changes to
watermarks or the core runtime.

David

On Thu, Jun 29, 2023 at 1:39 AM Martijn Visser <ma...@apache.org> wrote:
>
> Hi Dong and Xuannan,
>
> I'm excited to see this FLIP. I think support for processing-time
> temporal joins is something that the Flink users will greatly benefit
> off. I specifically want to call-out that it's great to see the use
> cases that this enables. From a technical implementation perspective,
> I defer to the opinion of others with expertise on this topic.
>
> Best regards,
>
> Martijn
>
> On Sun, Jun 25, 2023 at 9:03 AM Xuannan Su <su...@gmail.com> wrote:
> >
> > Hi all,
> >
> > Dong(cc'ed) and I are opening this thread to discuss our proposal to
> > enhance the watermark to properly support processing-time temporal
> > join, which has been documented in FLIP-326 [1].
> >
> > We want to support the use case where the records from the probe side
> > of the processing-time temporal join need to wait until the build side
> > finishes the snapshot phrase by enhancing the expressiveness of the
> > Watermark. Additionally, these changes lay the groundwork for
> > simplifying the DataStream APIs, eliminating the need for users to
> > explicitly differentiate between event-time and processing-time,
> > resulting in a more intuitive user experience.
> >
> > Please refer to the FLIP document for more details about the proposed
> > design and implementation. We welcome any feedback and opinions on
> > this proposal.
> >
> > Best regards,
> >
> > Dong and Xuannan
> >
> > [1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-326%3A+Enhance+Watermark+to+Support+Processing-Time+Temporal+Join

Re: [DISCUSS] FLIP-326: Enhance Watermark to Support Processing-Time Temporal Join

Posted by Martijn Visser <ma...@apache.org>.
Hi Dong and Xuannan,

I'm excited to see this FLIP. I think support for processing-time
temporal joins is something that the Flink users will greatly benefit
off. I specifically want to call-out that it's great to see the use
cases that this enables. From a technical implementation perspective,
I defer to the opinion of others with expertise on this topic.

Best regards,

Martijn

On Sun, Jun 25, 2023 at 9:03 AM Xuannan Su <su...@gmail.com> wrote:
>
> Hi all,
>
> Dong(cc'ed) and I are opening this thread to discuss our proposal to
> enhance the watermark to properly support processing-time temporal
> join, which has been documented in FLIP-326 [1].
>
> We want to support the use case where the records from the probe side
> of the processing-time temporal join need to wait until the build side
> finishes the snapshot phrase by enhancing the expressiveness of the
> Watermark. Additionally, these changes lay the groundwork for
> simplifying the DataStream APIs, eliminating the need for users to
> explicitly differentiate between event-time and processing-time,
> resulting in a more intuitive user experience.
>
> Please refer to the FLIP document for more details about the proposed
> design and implementation. We welcome any feedback and opinions on
> this proposal.
>
> Best regards,
>
> Dong and Xuannan
>
> [1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-326%3A+Enhance+Watermark+to+Support+Processing-Time+Temporal+Join