You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by yidan zhao <hi...@gmail.com> on 2021/03/01 12:00:29 UTC

how to propagate watermarks across multiple jobs

I have a job which includes about 50+ tasks. I want to split it to multiple
jobs, and the data is transferred through Kafka, but how about watermark?

Is anyone have do something similar and solved this problem?

Here I give an example:
The original job: kafkaStream1(src-topic) => xxxProcess => xxxWindow1 ==>
xxxWindow2 resultSinkToKafka(result-topic).

The new job1: kafkaStream1(src-topic) => xxxProcess => xxxWindow1 ==>
resultSinkToKafka(mid-topic).
The new job2: kafkaStream1(mid-topic) => xxxWindow2 ==>
resultSinkToKafka(result-topic).

The watermark for window1 and window 2 is separated to two jobs, which also
seems to be working, but this introduces a 5-minute delay for window2 (both
window is 5min's cycle).

The key problem is that the window's cycle is 5min, so the window2 will
have a 5min's delay.
If watermark can be transferred between jobs, it is not a problem anymore.

Re: how to propagate watermarks across multiple jobs

Posted by yidan zhao <hi...@gmail.com>.
And do you know when kafka consumer/producer will be re implemented
according to the new source/sink api? I am thinking whether I should adjust
the code for now, since I need to re adjust the code when it is
reconstructed to the new source/sink api.

yidan zhao <hi...@gmail.com> 于2021年3月4日周四 下午4:44写道:

> I uploaded a picture to describe that.
> https://ftp.bmp.ovh/imgs/2021/03/2068f2e22045e696.png
>
>>

Re: how to propagate watermarks across multiple jobs

Posted by yidan zhao <hi...@gmail.com>.
I uploaded a picture to describe that.
https://ftp.bmp.ovh/imgs/2021/03/2068f2e22045e696.png

>

Re: how to propagate watermarks across multiple jobs

Posted by yidan zhao <hi...@gmail.com>.
Thank you.

Yuan Mei <yu...@gmail.com> 于2021年3月4日周四 下午11:10写道:

> Hey Yidan,
>
> KafkaShuffle is initially motivated to support shuffle data
> materialization on Kafka, and started with a limited version supporting
> hash-partition only. Watermark is maintained and forwarded as part of
> shuffle data. So you are right, watermark storing/forwarding logic has
> nothing to do with whether the stream is keyed or not. The current approach
> in KafkaShuffle should also work for non-keyed streams if I remember
> correclty. So, yes, the logic can be extracted and generalized.
>
> Best,
>
> Yuan
>
> On Thu, Mar 4, 2021 at 4:26 PM yidan zhao <hi...@gmail.com> wrote:
>
>> One more question, If I only need watermark's logic, not keyedStream, why
>> not provide methods such as writeDataStream and readDataStream. It uses the
>> similar methods for kafka producer sink records and broadcast watermark to
>> partitions and then kafka consumers read it and regenerate the watermark. I
>> think it will be more general? In this way, the kafka consumer reads the
>> stream from kafka, and can continue to call keyBy to get a keyedStream. I
>> don't know why KafkaShuffle only considers the 'keyedStream' case.
>>
>> Piotr Nowojski <pn...@apache.org> 于2021年3月4日周四 下午3:54写道:
>>
>>> Great :)
>>>
>>> Just one more note. Currently FlinkKafkaShuffle has a critical bug [1]
>>> that probably will prevent you from using it directly. I hope it will be
>>> fixed in some next release. In the meantime you can just inspire your
>>> solution with the source code.
>>>
>>> Best,
>>> Piotrek
>>>
>>>
>>> [1] https://issues.apache.org/jira/browse/FLINK-21317
>>>
>>> czw., 4 mar 2021 o 03:48 yidan zhao <hi...@gmail.com> napisał(a):
>>>
>>>> Yes, you are right and thank you. I take a brief look at what
>>>> FlinkKafkaShuffle is doing, it seems what I need and I will have a try.
>>>>
>>>>>

Re: how to propagate watermarks across multiple jobs

Posted by Yuan Mei <yu...@gmail.com>.
Hey Yidan,

KafkaShuffle is initially motivated to support shuffle data materialization
on Kafka, and started with a limited version supporting hash-partition
only. Watermark is maintained and forwarded as part of shuffle data. So you
are right, watermark storing/forwarding logic has nothing to do with
whether the stream is keyed or not. The current approach in KafkaShuffle
should also work for non-keyed streams if I remember correclty. So, yes,
the logic can be extracted and generalized.

Best,

Yuan

On Thu, Mar 4, 2021 at 4:26 PM yidan zhao <hi...@gmail.com> wrote:

> One more question, If I only need watermark's logic, not keyedStream, why
> not provide methods such as writeDataStream and readDataStream. It uses the
> similar methods for kafka producer sink records and broadcast watermark to
> partitions and then kafka consumers read it and regenerate the watermark. I
> think it will be more general? In this way, the kafka consumer reads the
> stream from kafka, and can continue to call keyBy to get a keyedStream. I
> don't know why KafkaShuffle only considers the 'keyedStream' case.
>
> Piotr Nowojski <pn...@apache.org> 于2021年3月4日周四 下午3:54写道:
>
>> Great :)
>>
>> Just one more note. Currently FlinkKafkaShuffle has a critical bug [1]
>> that probably will prevent you from using it directly. I hope it will be
>> fixed in some next release. In the meantime you can just inspire your
>> solution with the source code.
>>
>> Best,
>> Piotrek
>>
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-21317
>>
>> czw., 4 mar 2021 o 03:48 yidan zhao <hi...@gmail.com> napisał(a):
>>
>>> Yes, you are right and thank you. I take a brief look at what
>>> FlinkKafkaShuffle is doing, it seems what I need and I will have a try.
>>>
>>>>

Re: how to propagate watermarks across multiple jobs

Posted by yidan zhao <hi...@gmail.com>.
One more question, If I only need watermark's logic, not keyedStream, why
not provide methods such as writeDataStream and readDataStream. It uses the
similar methods for kafka producer sink records and broadcast watermark to
partitions and then kafka consumers read it and regenerate the watermark. I
think it will be more general? In this way, the kafka consumer reads the
stream from kafka, and can continue to call keyBy to get a keyedStream. I
don't know why KafkaShuffle only considers the 'keyedStream' case.

Piotr Nowojski <pn...@apache.org> 于2021年3月4日周四 下午3:54写道:

> Great :)
>
> Just one more note. Currently FlinkKafkaShuffle has a critical bug [1]
> that probably will prevent you from using it directly. I hope it will be
> fixed in some next release. In the meantime you can just inspire your
> solution with the source code.
>
> Best,
> Piotrek
>
>
> [1] https://issues.apache.org/jira/browse/FLINK-21317
>
> czw., 4 mar 2021 o 03:48 yidan zhao <hi...@gmail.com> napisał(a):
>
>> Yes, you are right and thank you. I take a brief look at what
>> FlinkKafkaShuffle is doing, it seems what I need and I will have a try.
>>
>>>

Re: how to propagate watermarks across multiple jobs

Posted by Piotr Nowojski <pn...@apache.org>.
Great :)

Just one more note. Currently FlinkKafkaShuffle has a critical bug [1] that
probably will prevent you from using it directly. I hope it will be fixed
in some next release. In the meantime you can just inspire your solution
with the source code.

Best,
Piotrek


[1] https://issues.apache.org/jira/browse/FLINK-21317

czw., 4 mar 2021 o 03:48 yidan zhao <hi...@gmail.com> napisał(a):

> Yes, you are right and thank you. I take a brief look at what
> FlinkKafkaShuffle is doing, it seems what I need and I will have a try.
>
>>

Re: how to propagate watermarks across multiple jobs

Posted by yidan zhao <hi...@gmail.com>.
Yes, you are right and thank you. I take a brief look at what
FlinkKafkaShuffle is doing, it seems what I need and I will have a try.

>

Re: how to propagate watermarks across multiple jobs

Posted by Piotr Nowojski <pn...@apache.org>.
Hi,

Can not you write the watermark as a special event to the "mid-topic"? In
the "new job2" you would parse this event and use it to assign watermark
before `xxxWindow2`? I believe this is what FlinkKafkaShuffle is doing [1],
you could look at its code for inspiration.

Piotrek

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.12/api/java/org/apache/flink/streaming/connectors/kafka/shuffle/FlinkKafkaShuffle.html

pon., 1 mar 2021 o 13:01 yidan zhao <hi...@gmail.com> napisał(a):

> I have a job which includes about 50+ tasks. I want to split it to
> multiple jobs, and the data is transferred through Kafka, but how about
> watermark?
>
> Is anyone have do something similar and solved this problem?
>
> Here I give an example:
> The original job: kafkaStream1(src-topic) => xxxProcess => xxxWindow1 ==>
> xxxWindow2 resultSinkToKafka(result-topic).
>
> The new job1: kafkaStream1(src-topic) => xxxProcess => xxxWindow1 ==>
> resultSinkToKafka(mid-topic).
> The new job2: kafkaStream1(mid-topic) => xxxWindow2 ==>
> resultSinkToKafka(result-topic).
>
> The watermark for window1 and window 2 is separated to two jobs, which
> also seems to be working, but this introduces a 5-minute delay for window2
> (both window is 5min's cycle).
>
> The key problem is that the window's cycle is 5min, so the window2 will
> have a 5min's delay.
> If watermark can be transferred between jobs, it is not a problem anymore.
>
>