You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by xie wei <ji...@googlemail.com> on 2017/07/28 13:00:52 UTC

Is watermark used by joining two streams

Hello,

i want to join two streams based on event time window, every stream has its
own watermark, one has priodic watermark and the other has punctuated
watermark.
are the watermarks used to trigger the join? if yes, which one and how is
it used?

Thank you and best regards
Wei

Re: AW: Is watermark used by joining two streams

Posted by "G.S.Vijay Raajaa" <gs...@gmail.com>.
Hi Fabian,

Thanks for the reply. I shall try the CoProcessFunction implementation.

Currently, I am trying to assign watermark on the keyed stream. Please find
a snippet of the code for better understanding;

List < String > names = new ArrayList < > ();

names.add("stream_a");

names.add("stream_b");

DataStream < String > messageStream = env.addSource(new
FlinkKafkaConsumer08 < > (names, new SimpleStringSchema(), properties));

KeyedStream < Tuple2 < String, JsonObject > , Tuple > pojo =
messageStream.map(new JsonDeserializerv5()).keyBy(0);

SingleOutputStreamOperator < Tuple2 < String, JsonObject >> watermarkStream
= pojo.assignTimestampsAndWatermarks(new TimestampExtractorMergerv5());

DataStream < JsonObject > merge_stream =
watermarkStream.keyBy(0).countWindow(2).apply(new JsonMergerv5());

The above snippet does a merge on the timestamp ( field (0) of the tuple ).
But then, apply function is out of order , meaning even when the streams
are joined at t2 which is less than watermark, they get processed by the
apply function. Kindly let me know if I am not using the watermarking in a
proper way or have misunderstood the usage of watermarks.

Regards,

Vijay Raajaa G S

On Mon, Jul 31, 2017 at 2:09 PM, Fabian Hueske <fh...@gmail.com> wrote:

> Hi Vijay,
>
> there are many ways to implement joins with a stateful CoProcessFunction.
> It gives you access to the timestamps of records and you can register
> timers that trigger when a certain time is reached.
> It is basically up to you how you join and emit data. You can drop late
> data or emit it. Please note that records are emitted either with their
> current timestamp (if in processElement()) or with the timestamp of the
> timer that fired (in onTimer()).
>
> Hope this helps,
> Fabian
>
>
>
> 2017-07-31 9:48 GMT+02:00 G.S.Vijay Raajaa <gs...@gmail.com>:
>
>> My bad. I meant only join. I am currently using keyBy on a timestamp
>> common across the streams.
>>
>> Regards,
>> Vijay Raajaa GS
>>
>> On Mon, Jul 31, 2017 at 1:16 PM, Fabian Hueske <fh...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> @Wei: You can implement very different behavior using a
>>> CoProcessFunction. However, if your operator is time-based, the logical
>>> time of the operator will be the minimum time of both streams (time of the
>>> "slower" watermark).
>>>
>>> @Vijay: I did not understand what your requirements are. Do you want to
>>> join or merge streams? Those are two different things. This thread
>>> discusses joins not merging.
>>>
>>> Best,
>>> Fabian
>>>
>>> 2017-07-31 4:24 GMT+02:00 G.S.Vijay Raajaa <gs...@gmail.com>:
>>>
>>>> Hi Fabian,
>>>>
>>>> How do I order by the merge time. Let's say I merge the stream at T1. I
>>>> wanted to drop T2 merge if T2 < T1. Now depending on the arrival of data
>>>> from individual stream and the time at which the merge happens, they become
>>>> out of order. Any thoughts will be really appreciated.
>>>>
>>>> Regards,
>>>> Vijay Raajaa GS
>>>>
>>>> On Jul 31, 2017 1:14 AM, "wei" <ji...@googlemail.com> wrote:
>>>>
>>>> Hello Fabian,
>>>>
>>>>
>>>>
>>>> thank you for your answer!
>>>>
>>>>
>>>>
>>>> Does it mean that the operator will wait until get two watermarks from
>>>> the input streams and emits then the “slower” watermark?
>>>>
>>>>
>>>>
>>>> Best regards
>>>>
>>>> Wei
>>>>
>>>>
>>>>
>>>> *Von:* Fabian Hueske [mailto:fhueske@gmail.com]
>>>> *Gesendet:* Sunday, July 30, 2017 11:17 AM
>>>> *An:* xie wei
>>>> *Cc:* user
>>>> *Betreff:* Re: Is watermark used by joining two streams
>>>>
>>>>
>>>>
>>>> Periodic and punctuated watermarks only differ in the way that they are
>>>> generated. Afterwards they are treated the same.
>>>>
>>>> An operator with two input streams will always sync its own watermarks
>>>> to the watermarks of both input streams, i.e., to the "slower" watermark of
>>>> both inputs.
>>>>
>>>> So if the left input says it is 12:14 and the right says it is 11:53,
>>>> the operator will have a internal time of 11:53 and emit watermarks
>>>> according to that time.
>>>>
>>>> Hope that helps,
>>>>
>>>> Fabian
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> 2017-07-28 15:00 GMT+02:00 xie wei <ji...@googlemail.com>:
>>>>
>>>> Hello,
>>>>
>>>> i want to join two streams based on event time window, every stream has
>>>> its own watermark, one has priodic watermark and the other has punctuated
>>>> watermark.
>>>>
>>>> are the watermarks used to trigger the join? if yes, which one and how
>>>> is it used?
>>>>
>>>> Thank you and best regards
>>>>
>>>> Wei
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>
>>
>

Re: AW: Is watermark used by joining two streams

Posted by Fabian Hueske <fh...@gmail.com>.
Hi Vijay,

there are many ways to implement joins with a stateful CoProcessFunction.
It gives you access to the timestamps of records and you can register
timers that trigger when a certain time is reached.
It is basically up to you how you join and emit data. You can drop late
data or emit it. Please note that records are emitted either with their
current timestamp (if in processElement()) or with the timestamp of the
timer that fired (in onTimer()).

Hope this helps,
Fabian



2017-07-31 9:48 GMT+02:00 G.S.Vijay Raajaa <gs...@gmail.com>:

> My bad. I meant only join. I am currently using keyBy on a timestamp
> common across the streams.
>
> Regards,
> Vijay Raajaa GS
>
> On Mon, Jul 31, 2017 at 1:16 PM, Fabian Hueske <fh...@gmail.com> wrote:
>
>> Hi,
>>
>> @Wei: You can implement very different behavior using a
>> CoProcessFunction. However, if your operator is time-based, the logical
>> time of the operator will be the minimum time of both streams (time of the
>> "slower" watermark).
>>
>> @Vijay: I did not understand what your requirements are. Do you want to
>> join or merge streams? Those are two different things. This thread
>> discusses joins not merging.
>>
>> Best,
>> Fabian
>>
>> 2017-07-31 4:24 GMT+02:00 G.S.Vijay Raajaa <gs...@gmail.com>:
>>
>>> Hi Fabian,
>>>
>>> How do I order by the merge time. Let's say I merge the stream at T1. I
>>> wanted to drop T2 merge if T2 < T1. Now depending on the arrival of data
>>> from individual stream and the time at which the merge happens, they become
>>> out of order. Any thoughts will be really appreciated.
>>>
>>> Regards,
>>> Vijay Raajaa GS
>>>
>>> On Jul 31, 2017 1:14 AM, "wei" <ji...@googlemail.com> wrote:
>>>
>>> Hello Fabian,
>>>
>>>
>>>
>>> thank you for your answer!
>>>
>>>
>>>
>>> Does it mean that the operator will wait until get two watermarks from
>>> the input streams and emits then the “slower” watermark?
>>>
>>>
>>>
>>> Best regards
>>>
>>> Wei
>>>
>>>
>>>
>>> *Von:* Fabian Hueske [mailto:fhueske@gmail.com]
>>> *Gesendet:* Sunday, July 30, 2017 11:17 AM
>>> *An:* xie wei
>>> *Cc:* user
>>> *Betreff:* Re: Is watermark used by joining two streams
>>>
>>>
>>>
>>> Periodic and punctuated watermarks only differ in the way that they are
>>> generated. Afterwards they are treated the same.
>>>
>>> An operator with two input streams will always sync its own watermarks
>>> to the watermarks of both input streams, i.e., to the "slower" watermark of
>>> both inputs.
>>>
>>> So if the left input says it is 12:14 and the right says it is 11:53,
>>> the operator will have a internal time of 11:53 and emit watermarks
>>> according to that time.
>>>
>>> Hope that helps,
>>>
>>> Fabian
>>>
>>>
>>>
>>>
>>>
>>> 2017-07-28 15:00 GMT+02:00 xie wei <ji...@googlemail.com>:
>>>
>>> Hello,
>>>
>>> i want to join two streams based on event time window, every stream has
>>> its own watermark, one has priodic watermark and the other has punctuated
>>> watermark.
>>>
>>> are the watermarks used to trigger the join? if yes, which one and how
>>> is it used?
>>>
>>> Thank you and best regards
>>>
>>> Wei
>>>
>>>
>>>
>>>
>>>
>>>
>>
>

Re: AW: Is watermark used by joining two streams

Posted by "G.S.Vijay Raajaa" <gs...@gmail.com>.
My bad. I meant only join. I am currently using keyBy on a timestamp common
across the streams.

Regards,
Vijay Raajaa GS

On Mon, Jul 31, 2017 at 1:16 PM, Fabian Hueske <fh...@gmail.com> wrote:

> Hi,
>
> @Wei: You can implement very different behavior using a CoProcessFunction.
> However, if your operator is time-based, the logical time of the operator
> will be the minimum time of both streams (time of the "slower" watermark).
>
> @Vijay: I did not understand what your requirements are. Do you want to
> join or merge streams? Those are two different things. This thread
> discusses joins not merging.
>
> Best,
> Fabian
>
> 2017-07-31 4:24 GMT+02:00 G.S.Vijay Raajaa <gs...@gmail.com>:
>
>> Hi Fabian,
>>
>> How do I order by the merge time. Let's say I merge the stream at T1. I
>> wanted to drop T2 merge if T2 < T1. Now depending on the arrival of data
>> from individual stream and the time at which the merge happens, they become
>> out of order. Any thoughts will be really appreciated.
>>
>> Regards,
>> Vijay Raajaa GS
>>
>> On Jul 31, 2017 1:14 AM, "wei" <ji...@googlemail.com> wrote:
>>
>> Hello Fabian,
>>
>>
>>
>> thank you for your answer!
>>
>>
>>
>> Does it mean that the operator will wait until get two watermarks from
>> the input streams and emits then the “slower” watermark?
>>
>>
>>
>> Best regards
>>
>> Wei
>>
>>
>>
>> *Von:* Fabian Hueske [mailto:fhueske@gmail.com]
>> *Gesendet:* Sunday, July 30, 2017 11:17 AM
>> *An:* xie wei
>> *Cc:* user
>> *Betreff:* Re: Is watermark used by joining two streams
>>
>>
>>
>> Periodic and punctuated watermarks only differ in the way that they are
>> generated. Afterwards they are treated the same.
>>
>> An operator with two input streams will always sync its own watermarks to
>> the watermarks of both input streams, i.e., to the "slower" watermark of
>> both inputs.
>>
>> So if the left input says it is 12:14 and the right says it is 11:53, the
>> operator will have a internal time of 11:53 and emit watermarks according
>> to that time.
>>
>> Hope that helps,
>>
>> Fabian
>>
>>
>>
>>
>>
>> 2017-07-28 15:00 GMT+02:00 xie wei <ji...@googlemail.com>:
>>
>> Hello,
>>
>> i want to join two streams based on event time window, every stream has
>> its own watermark, one has priodic watermark and the other has punctuated
>> watermark.
>>
>> are the watermarks used to trigger the join? if yes, which one and how is
>> it used?
>>
>> Thank you and best regards
>>
>> Wei
>>
>>
>>
>>
>>
>>
>

Re: AW: Is watermark used by joining two streams

Posted by Fabian Hueske <fh...@gmail.com>.
Hi,

@Wei: You can implement very different behavior using a CoProcessFunction.
However, if your operator is time-based, the logical time of the operator
will be the minimum time of both streams (time of the "slower" watermark).

@Vijay: I did not understand what your requirements are. Do you want to
join or merge streams? Those are two different things. This thread
discusses joins not merging.

Best,
Fabian

2017-07-31 4:24 GMT+02:00 G.S.Vijay Raajaa <gs...@gmail.com>:

> Hi Fabian,
>
> How do I order by the merge time. Let's say I merge the stream at T1. I
> wanted to drop T2 merge if T2 < T1. Now depending on the arrival of data
> from individual stream and the time at which the merge happens, they become
> out of order. Any thoughts will be really appreciated.
>
> Regards,
> Vijay Raajaa GS
>
> On Jul 31, 2017 1:14 AM, "wei" <ji...@googlemail.com> wrote:
>
> Hello Fabian,
>
>
>
> thank you for your answer!
>
>
>
> Does it mean that the operator will wait until get two watermarks from the
> input streams and emits then the “slower” watermark?
>
>
>
> Best regards
>
> Wei
>
>
>
> *Von:* Fabian Hueske [mailto:fhueske@gmail.com]
> *Gesendet:* Sunday, July 30, 2017 11:17 AM
> *An:* xie wei
> *Cc:* user
> *Betreff:* Re: Is watermark used by joining two streams
>
>
>
> Periodic and punctuated watermarks only differ in the way that they are
> generated. Afterwards they are treated the same.
>
> An operator with two input streams will always sync its own watermarks to
> the watermarks of both input streams, i.e., to the "slower" watermark of
> both inputs.
>
> So if the left input says it is 12:14 and the right says it is 11:53, the
> operator will have a internal time of 11:53 and emit watermarks according
> to that time.
>
> Hope that helps,
>
> Fabian
>
>
>
>
>
> 2017-07-28 15:00 GMT+02:00 xie wei <ji...@googlemail.com>:
>
> Hello,
>
> i want to join two streams based on event time window, every stream has
> its own watermark, one has priodic watermark and the other has punctuated
> watermark.
>
> are the watermarks used to trigger the join? if yes, which one and how is
> it used?
>
> Thank you and best regards
>
> Wei
>
>
>
>
>
>

Re: AW: Is watermark used by joining two streams

Posted by "G.S.Vijay Raajaa" <gs...@gmail.com>.
Hi Fabian,

How do I order by the merge time. Let's say I merge the stream at T1. I
wanted to drop T2 merge if T2 < T1. Now depending on the arrival of data
from individual stream and the time at which the merge happens, they become
out of order. Any thoughts will be really appreciated.

Regards,
Vijay Raajaa GS

On Jul 31, 2017 1:14 AM, "wei" <ji...@googlemail.com> wrote:

Hello Fabian,



thank you for your answer!



Does it mean that the operator will wait until get two watermarks from the
input streams and emits then the “slower” watermark?



Best regards

Wei



*Von:* Fabian Hueske [mailto:fhueske@gmail.com]
*Gesendet:* Sunday, July 30, 2017 11:17 AM
*An:* xie wei
*Cc:* user
*Betreff:* Re: Is watermark used by joining two streams



Periodic and punctuated watermarks only differ in the way that they are
generated. Afterwards they are treated the same.

An operator with two input streams will always sync its own watermarks to
the watermarks of both input streams, i.e., to the "slower" watermark of
both inputs.

So if the left input says it is 12:14 and the right says it is 11:53, the
operator will have a internal time of 11:53 and emit watermarks according
to that time.

Hope that helps,

Fabian





2017-07-28 15:00 GMT+02:00 xie wei <ji...@googlemail.com>:

Hello,

i want to join two streams based on event time window, every stream has its
own watermark, one has priodic watermark and the other has punctuated
watermark.

are the watermarks used to trigger the join? if yes, which one and how is
it used?

Thank you and best regards

Wei

AW: Is watermark used by joining two streams

Posted by wei <ji...@googlemail.com>.
Hello Fabian,

 

thank you for your answer!

 

Does it mean that the operator will wait until get two watermarks from the input streams and emits then the “slower” watermark?

 

Best regards

Wei

 

Von: Fabian Hueske [mailto:fhueske@gmail.com] 
Gesendet: Sunday, July 30, 2017 11:17 AM
An: xie wei
Cc: user
Betreff: Re: Is watermark used by joining two streams

 

Periodic and punctuated watermarks only differ in the way that they are generated. Afterwards they are treated the same.

An operator with two input streams will always sync its own watermarks to the watermarks of both input streams, i.e., to the "slower" watermark of both inputs.

So if the left input says it is 12:14 and the right says it is 11:53, the operator will have a internal time of 11:53 and emit watermarks according to that time.

Hope that helps,

Fabian

 

 

2017-07-28 15:00 GMT+02:00 xie wei <ji...@googlemail.com>:

Hello,

i want to join two streams based on event time window, every stream has its own watermark, one has priodic watermark and the other has punctuated watermark.

are the watermarks used to trigger the join? if yes, which one and how is it used?

Thank you and best regards

Wei


 


 


Re: Is watermark used by joining two streams

Posted by Fabian Hueske <fh...@gmail.com>.
Periodic and punctuated watermarks only differ in the way that they are
generated. Afterwards they are treated the same.
An operator with two input streams will always sync its own watermarks to
the watermarks of both input streams, i.e., to the "slower" watermark of
both inputs.
So if the left input says it is 12:14 and the right says it is 11:53, the
operator will have a internal time of 11:53 and emit watermarks according
to that time.

Hope that helps,
Fabian


2017-07-28 15:00 GMT+02:00 xie wei <ji...@googlemail.com>:

> Hello,
>
> i want to join two streams based on event time window, every stream has
> its own watermark, one has priodic watermark and the other has punctuated
> watermark.
> are the watermarks used to trigger the join? if yes, which one and how is
> it used?
>
> Thank you and best regards
> Wei
>
>