You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by David Christle via user <us...@flink.apache.org> on 2023/01/13 07:02:57 UTC

HybridSource: how to specify different watermark strategies for underlying sources

Hello,

I'm trying to use the HybridSource to read from a bounded FLIP-27 Iceberg
source and an unbounded FLIP-27 PubSub source. Based on my searches, the
most common patterns for assigning timestamps and watermarks with FLIP-27
sources are either via the fromSource method in
StreamingExecutionEnvironment, which returns a subclass of DataStream, or
using assignTimestampsAndWatermarks(WatermarkStrategy wms) on an already
existing DataStream.

But the only way to build a HybridSource, it seems, is via its addSource
method, which accepts only Source types. We cannot call an analogue of
assignTimestampsAndWatermarks on a Source.

We could perhaps call that method on the DataStream produced by
env.fromSource(hybridSource), but I believe that would use the same
timestamp & watermark assignment strategy for both. We need the watermarks
to be assigned differently based on the source. Our Iceberg tables are
partitioned by day, so the watermark lags by about 1 day when we use
event-time aligned assignment until it finishes. Those watermarks originate
within the Source. But for PubSub, we'd like to use the generic bounded
out-of-orderness watermark strategy for just a few seconds of
out-of-orderness. The PubSub source doesn't generate watermarks within the
source -- there is no per-split information exposed by PubSub that would
make a better watermark, like in the Kafka connector. The only way to
assign it a watermark strategy is via the DataStream-level assignment
methods.

Is the only way to use different watermark strategies to modify the PubSub
Source to do the assignment within the Source?

I wonder if, given that it seems much more common to assign watermarks at
the DataStream level (e.g. FLIP-182's examples), if HybridSource could be
modified to accept an optional WatermarkStrategy for each underlying
source, similar to how fromSource works.

Kind regards,
David

Re: HybridSource: how to specify different watermark strategies for underlying sources

Posted by Martijn Visser <ma...@apache.org>.
Hi David,

That's good to know, perhaps it would be nice to see if we can move it all
forward into a new version. The externalization is nearly finished (I
pushed a release candidate out for the version that's currently bundled
with Flink 1.16), so let's see if we can move it forward. I'm also looping
in Ryan who has been helping out here too.

If you are able to confirm that it works, do let us know. It could also be
interesting as a blog post to share.

Best regards,

Martijn



Op wo 18 jan. 2023 om 07:43 schreef David Christle <
david.christle@discordapp.com>:

> Hi Martijn,
>
> That's correct. We have a forked version of the PR from Ryan Skraba (
> https://github.com/apache/flink/pull/18823), which itself is originally
> based on the PR from Jakob Edding (
> https://github.com/apache/flink/pull/15152), and have incorporated an old
> PR (https://github.com/apache/flink/pull/9388) for retries from Richard
> Deurwaarder, the main author of the non-FLIP-27 PubSub source. Some changes
> suggested in code review & other cosmetic changes are also in it.
>
> I'm aware of the ongoing efforts to split out connectors into separate
> repositories, and I'd planned on opening a new PR somewhere -- either the
> flink repository, or a new one -- to contribute it upstream. Any
> suggestions on how to proceed? The forked code runs without issues on our
> production traffic.
>
> As far as the HybridSource & watermarking is concerned, I realized that I
> can use the switch timestamp to determine whether the underlying Iceberg or
> PubSub source is being used, within the same WatermarkGenerator. This way,
> a single WatermarkGenerator can be applied to the HybridSource that has the
> correct behavior for each underlying source. I haven't confirmed everything
> works as expected, but I am pretty optimistic.
>
> Also, in my last message, I was mistaken about the Iceberg source creating
> watermarks internally. In a conversation with Steven Wu, he mentioned that
> the Iceberg source does not support generating watermarks inside. I don't
> quite have a full understanding of how watermarks are generated within
> sources & was confused by the event-time-aligned assigner code's in-memory
> "watermark" tracker -- it may be that these aren't watermarks, but instead
> are just timestamp trackers used for aligning splits.
>
> Kind regards,
> David
>
> On Tue, Jan 17, 2023 at 6:01 AM Martijn Visser <ma...@apache.org>
> wrote:
>
>> Hi David,
>>
>> While I don't immediately have an answer to your question, I was
>> triggered by your inclusion of "an unbounded FLIP-27 PubSub source". Do you
>> mean your own (forked) version of GCP using the new interfaces, since the
>> current GCP connector is not a FLIP-27 compatible source?
>>
>> Best regards,
>>
>> Martijn
>>
>> Op vr 13 jan. 2023 om 08:03 schreef David Christle via user <
>> user@flink.apache.org>:
>>
>>> Hello,
>>>
>>> I'm trying to use the HybridSource to read from a bounded FLIP-27
>>> Iceberg source and an unbounded FLIP-27 PubSub source. Based on my
>>> searches, the most common patterns for assigning timestamps and watermarks
>>> with FLIP-27 sources are either via the fromSource method in
>>> StreamingExecutionEnvironment, which returns a subclass of DataStream, or
>>> using assignTimestampsAndWatermarks(WatermarkStrategy wms) on an already
>>> existing DataStream.
>>>
>>> But the only way to build a HybridSource, it seems, is via its addSource
>>> method, which accepts only Source types. We cannot call an analogue of
>>> assignTimestampsAndWatermarks on a Source.
>>>
>>> We could perhaps call that method on the DataStream produced by
>>> env.fromSource(hybridSource), but I believe that would use the same
>>> timestamp & watermark assignment strategy for both. We need the watermarks
>>> to be assigned differently based on the source. Our Iceberg tables are
>>> partitioned by day, so the watermark lags by about 1 day when we use
>>> event-time aligned assignment until it finishes. Those watermarks originate
>>> within the Source. But for PubSub, we'd like to use the generic bounded
>>> out-of-orderness watermark strategy for just a few seconds of
>>> out-of-orderness. The PubSub source doesn't generate watermarks within the
>>> source -- there is no per-split information exposed by PubSub that would
>>> make a better watermark, like in the Kafka connector. The only way to
>>> assign it a watermark strategy is via the DataStream-level assignment
>>> methods.
>>>
>>> Is the only way to use different watermark strategies to modify the
>>> PubSub Source to do the assignment within the Source?
>>>
>>> I wonder if, given that it seems much more common to assign watermarks
>>> at the DataStream level (e.g. FLIP-182's examples), if HybridSource could
>>> be modified to accept an optional WatermarkStrategy for each underlying
>>> source, similar to how fromSource works.
>>>
>>> Kind regards,
>>> David
>>>
>>>

Re: HybridSource: how to specify different watermark strategies for underlying sources

Posted by David Christle via user <us...@flink.apache.org>.
Hi Martijn,

That's correct. We have a forked version of the PR from Ryan Skraba (
https://github.com/apache/flink/pull/18823), which itself is originally
based on the PR from Jakob Edding (
https://github.com/apache/flink/pull/15152), and have incorporated an old
PR (https://github.com/apache/flink/pull/9388) for retries from Richard
Deurwaarder, the main author of the non-FLIP-27 PubSub source. Some changes
suggested in code review & other cosmetic changes are also in it.

I'm aware of the ongoing efforts to split out connectors into separate
repositories, and I'd planned on opening a new PR somewhere -- either the
flink repository, or a new one -- to contribute it upstream. Any
suggestions on how to proceed? The forked code runs without issues on our
production traffic.

As far as the HybridSource & watermarking is concerned, I realized that I
can use the switch timestamp to determine whether the underlying Iceberg or
PubSub source is being used, within the same WatermarkGenerator. This way,
a single WatermarkGenerator can be applied to the HybridSource that has the
correct behavior for each underlying source. I haven't confirmed everything
works as expected, but I am pretty optimistic.

Also, in my last message, I was mistaken about the Iceberg source creating
watermarks internally. In a conversation with Steven Wu, he mentioned that
the Iceberg source does not support generating watermarks inside. I don't
quite have a full understanding of how watermarks are generated within
sources & was confused by the event-time-aligned assigner code's in-memory
"watermark" tracker -- it may be that these aren't watermarks, but instead
are just timestamp trackers used for aligning splits.

Kind regards,
David

On Tue, Jan 17, 2023 at 6:01 AM Martijn Visser <ma...@apache.org>
wrote:

> Hi David,
>
> While I don't immediately have an answer to your question, I was triggered
> by your inclusion of "an unbounded FLIP-27 PubSub source". Do you mean your
> own (forked) version of GCP using the new interfaces, since the current GCP
> connector is not a FLIP-27 compatible source?
>
> Best regards,
>
> Martijn
>
> Op vr 13 jan. 2023 om 08:03 schreef David Christle via user <
> user@flink.apache.org>:
>
>> Hello,
>>
>> I'm trying to use the HybridSource to read from a bounded FLIP-27 Iceberg
>> source and an unbounded FLIP-27 PubSub source. Based on my searches, the
>> most common patterns for assigning timestamps and watermarks with FLIP-27
>> sources are either via the fromSource method in
>> StreamingExecutionEnvironment, which returns a subclass of DataStream, or
>> using assignTimestampsAndWatermarks(WatermarkStrategy wms) on an already
>> existing DataStream.
>>
>> But the only way to build a HybridSource, it seems, is via its addSource
>> method, which accepts only Source types. We cannot call an analogue of
>> assignTimestampsAndWatermarks on a Source.
>>
>> We could perhaps call that method on the DataStream produced by
>> env.fromSource(hybridSource), but I believe that would use the same
>> timestamp & watermark assignment strategy for both. We need the watermarks
>> to be assigned differently based on the source. Our Iceberg tables are
>> partitioned by day, so the watermark lags by about 1 day when we use
>> event-time aligned assignment until it finishes. Those watermarks originate
>> within the Source. But for PubSub, we'd like to use the generic bounded
>> out-of-orderness watermark strategy for just a few seconds of
>> out-of-orderness. The PubSub source doesn't generate watermarks within the
>> source -- there is no per-split information exposed by PubSub that would
>> make a better watermark, like in the Kafka connector. The only way to
>> assign it a watermark strategy is via the DataStream-level assignment
>> methods.
>>
>> Is the only way to use different watermark strategies to modify the
>> PubSub Source to do the assignment within the Source?
>>
>> I wonder if, given that it seems much more common to assign watermarks at
>> the DataStream level (e.g. FLIP-182's examples), if HybridSource could be
>> modified to accept an optional WatermarkStrategy for each underlying
>> source, similar to how fromSource works.
>>
>> Kind regards,
>> David
>>
>>

Re: HybridSource: how to specify different watermark strategies for underlying sources

Posted by Martijn Visser <ma...@apache.org>.
Hi David,

While I don't immediately have an answer to your question, I was triggered
by your inclusion of "an unbounded FLIP-27 PubSub source". Do you mean your
own (forked) version of GCP using the new interfaces, since the current GCP
connector is not a FLIP-27 compatible source?

Best regards,

Martijn

Op vr 13 jan. 2023 om 08:03 schreef David Christle via user <
user@flink.apache.org>:

> Hello,
>
> I'm trying to use the HybridSource to read from a bounded FLIP-27 Iceberg
> source and an unbounded FLIP-27 PubSub source. Based on my searches, the
> most common patterns for assigning timestamps and watermarks with FLIP-27
> sources are either via the fromSource method in
> StreamingExecutionEnvironment, which returns a subclass of DataStream, or
> using assignTimestampsAndWatermarks(WatermarkStrategy wms) on an already
> existing DataStream.
>
> But the only way to build a HybridSource, it seems, is via its addSource
> method, which accepts only Source types. We cannot call an analogue of
> assignTimestampsAndWatermarks on a Source.
>
> We could perhaps call that method on the DataStream produced by
> env.fromSource(hybridSource), but I believe that would use the same
> timestamp & watermark assignment strategy for both. We need the watermarks
> to be assigned differently based on the source. Our Iceberg tables are
> partitioned by day, so the watermark lags by about 1 day when we use
> event-time aligned assignment until it finishes. Those watermarks originate
> within the Source. But for PubSub, we'd like to use the generic bounded
> out-of-orderness watermark strategy for just a few seconds of
> out-of-orderness. The PubSub source doesn't generate watermarks within the
> source -- there is no per-split information exposed by PubSub that would
> make a better watermark, like in the Kafka connector. The only way to
> assign it a watermark strategy is via the DataStream-level assignment
> methods.
>
> Is the only way to use different watermark strategies to modify the PubSub
> Source to do the assignment within the Source?
>
> I wonder if, given that it seems much more common to assign watermarks at
> the DataStream level (e.g. FLIP-182's examples), if HybridSource could be
> modified to accept an optional WatermarkStrategy for each underlying
> source, similar to how fromSource works.
>
> Kind regards,
> David
>
>