You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Chengzhi Zhao <w....@gmail.com> on 2018/04/25 18:20:26 UTC

Multiple Streams Connect Watermark

Hi, everyone,

I am trying to do some join-like pipeline using flink connect operator and
CoProcessFunction, I have use case that I need to connect 3+ streams. So I
am having something like this:

A
    ===> C
B                 ==> E
              D

So two streams A and B connect at first with 3 hours late on low watermark,
after data has been emitted (the output C stream), a new stream D connect
to C and emitted E as final output. I was wondering how the downstream
watermark should be defined. Should I give C stream a new watermark for 3
hours delay again? or when I connect stream D, everything will be 6 hours
late on low watermark.

I am using BoundedOutOfOrdernessGenerator[1] with maxOutOfOrderness 3 hours

Thanks for your tips and help in advance.

Best,
Chengzhi

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/event_timestamps_watermarks.html#with-periodic-watermarks

Re: Multiple Streams Connect Watermark

Posted by Chengzhi Zhao <w....@gmail.com>.
Got it, Thanks a lot Fabian. Looking forward to seeing your book.

Best,
Chengzhi

On Thu, Apr 26, 2018 at 4:02 PM, Fabian Hueske <fh...@gmail.com> wrote:

> You can also merge all three types into an nary-Either type and union all
> three inputs together.
> However, Flink only supports a binary Either, so you'd have to implement a
> custom TypeInformation and TypeSerializer to make that work.
>
> Best, Fabian
>
> 2018-04-26 20:44 GMT+02:00 Chengzhi Zhao <w....@gmail.com>:
>
>> Thanks Fabian for the explanation.
>>
>> If I have data with different schemas, it seems the only option I have is
>> to use connect to perform joins (inner, outer), is there any operators that
>> can put more than two streams together (all different schema)?
>>
>> Best,
>> Chengzhi
>>
>> On Thu, Apr 26, 2018 at 6:05 AM, Fabian Hueske <fh...@gmail.com> wrote:
>>
>>> Hi Chengzhi,
>>>
>>> Functions in Flink are implemented in a way to preserve the timestamps
>>> of elements or assign timestamps which are aligned with the existing
>>> watermarks.
>>> For example, the result of a time window aggregation has the end
>>> timestamp of the window as a timestamp and records emitted by the onTimer()
>>> method have the timestamp of the timer as a record timestamp.
>>> So unless you fiddle with internal APIs to reset the record timestamps
>>> of elements, you don't need to worry about generating new watermarks.
>>>
>>> Best, Fabian
>>>
>>> 2018-04-25 20:20 GMT+02:00 Chengzhi Zhao <w....@gmail.com>:
>>>
>>>> Hi, everyone,
>>>>
>>>> I am trying to do some join-like pipeline using flink connect operator
>>>> and CoProcessFunction, I have use case that I need to connect 3+ streams.
>>>> So I am having something like this:
>>>>
>>>> A
>>>>     ===> C
>>>> B                 ==> E
>>>>               D
>>>>
>>>> So two streams A and B connect at first with 3 hours late on low
>>>> watermark, after data has been emitted (the output C stream), a new stream
>>>> D connect to C and emitted E as final output. I was wondering how the
>>>> downstream watermark should be defined. Should I give C stream a new
>>>> watermark for 3 hours delay again? or when I connect stream D, everything
>>>> will be 6 hours late on low watermark.
>>>>
>>>> I am using BoundedOutOfOrdernessGenerator[1] with maxOutOfOrderness 3
>>>> hours
>>>>
>>>> Thanks for your tips and help in advance.
>>>>
>>>> Best,
>>>> Chengzhi
>>>>
>>>> [1]https://ci.apache.org/projects/flink/flink-docs-release-1
>>>> .4/dev/event_timestamps_watermarks.html#with-periodic-watermarks
>>>>
>>>
>>>
>>
>

Re: Multiple Streams Connect Watermark

Posted by Fabian Hueske <fh...@gmail.com>.
You can also merge all three types into an nary-Either type and union all
three inputs together.
However, Flink only supports a binary Either, so you'd have to implement a
custom TypeInformation and TypeSerializer to make that work.

Best, Fabian

2018-04-26 20:44 GMT+02:00 Chengzhi Zhao <w....@gmail.com>:

> Thanks Fabian for the explanation.
>
> If I have data with different schemas, it seems the only option I have is
> to use connect to perform joins (inner, outer), is there any operators that
> can put more than two streams together (all different schema)?
>
> Best,
> Chengzhi
>
> On Thu, Apr 26, 2018 at 6:05 AM, Fabian Hueske <fh...@gmail.com> wrote:
>
>> Hi Chengzhi,
>>
>> Functions in Flink are implemented in a way to preserve the timestamps of
>> elements or assign timestamps which are aligned with the existing
>> watermarks.
>> For example, the result of a time window aggregation has the end
>> timestamp of the window as a timestamp and records emitted by the onTimer()
>> method have the timestamp of the timer as a record timestamp.
>> So unless you fiddle with internal APIs to reset the record timestamps of
>> elements, you don't need to worry about generating new watermarks.
>>
>> Best, Fabian
>>
>> 2018-04-25 20:20 GMT+02:00 Chengzhi Zhao <w....@gmail.com>:
>>
>>> Hi, everyone,
>>>
>>> I am trying to do some join-like pipeline using flink connect operator
>>> and CoProcessFunction, I have use case that I need to connect 3+ streams.
>>> So I am having something like this:
>>>
>>> A
>>>     ===> C
>>> B                 ==> E
>>>               D
>>>
>>> So two streams A and B connect at first with 3 hours late on low
>>> watermark, after data has been emitted (the output C stream), a new stream
>>> D connect to C and emitted E as final output. I was wondering how the
>>> downstream watermark should be defined. Should I give C stream a new
>>> watermark for 3 hours delay again? or when I connect stream D, everything
>>> will be 6 hours late on low watermark.
>>>
>>> I am using BoundedOutOfOrdernessGenerator[1] with maxOutOfOrderness 3
>>> hours
>>>
>>> Thanks for your tips and help in advance.
>>>
>>> Best,
>>> Chengzhi
>>>
>>> [1]https://ci.apache.org/projects/flink/flink-docs-release-1
>>> .4/dev/event_timestamps_watermarks.html#with-periodic-watermarks
>>>
>>
>>
>

Re: Multiple Streams Connect Watermark

Posted by Chengzhi Zhao <w....@gmail.com>.
Thanks Fabian for the explanation.

If I have data with different schemas, it seems the only option I have is
to use connect to perform joins (inner, outer), is there any operators that
can put more than two streams together (all different schema)?

Best,
Chengzhi

On Thu, Apr 26, 2018 at 6:05 AM, Fabian Hueske <fh...@gmail.com> wrote:

> Hi Chengzhi,
>
> Functions in Flink are implemented in a way to preserve the timestamps of
> elements or assign timestamps which are aligned with the existing
> watermarks.
> For example, the result of a time window aggregation has the end timestamp
> of the window as a timestamp and records emitted by the onTimer() method
> have the timestamp of the timer as a record timestamp.
> So unless you fiddle with internal APIs to reset the record timestamps of
> elements, you don't need to worry about generating new watermarks.
>
> Best, Fabian
>
> 2018-04-25 20:20 GMT+02:00 Chengzhi Zhao <w....@gmail.com>:
>
>> Hi, everyone,
>>
>> I am trying to do some join-like pipeline using flink connect operator
>> and CoProcessFunction, I have use case that I need to connect 3+ streams.
>> So I am having something like this:
>>
>> A
>>     ===> C
>> B                 ==> E
>>               D
>>
>> So two streams A and B connect at first with 3 hours late on low
>> watermark, after data has been emitted (the output C stream), a new stream
>> D connect to C and emitted E as final output. I was wondering how the
>> downstream watermark should be defined. Should I give C stream a new
>> watermark for 3 hours delay again? or when I connect stream D, everything
>> will be 6 hours late on low watermark.
>>
>> I am using BoundedOutOfOrdernessGenerator[1] with maxOutOfOrderness 3
>> hours
>>
>> Thanks for your tips and help in advance.
>>
>> Best,
>> Chengzhi
>>
>> [1]https://ci.apache.org/projects/flink/flink-docs-release-
>> 1.4/dev/event_timestamps_watermarks.html#with-periodic-watermarks
>>
>
>

Re: Multiple Streams Connect Watermark

Posted by Fabian Hueske <fh...@gmail.com>.
Hi Chengzhi,

Functions in Flink are implemented in a way to preserve the timestamps of
elements or assign timestamps which are aligned with the existing
watermarks.
For example, the result of a time window aggregation has the end timestamp
of the window as a timestamp and records emitted by the onTimer() method
have the timestamp of the timer as a record timestamp.
So unless you fiddle with internal APIs to reset the record timestamps of
elements, you don't need to worry about generating new watermarks.

Best, Fabian

2018-04-25 20:20 GMT+02:00 Chengzhi Zhao <w....@gmail.com>:

> Hi, everyone,
>
> I am trying to do some join-like pipeline using flink connect operator and
> CoProcessFunction, I have use case that I need to connect 3+ streams. So I
> am having something like this:
>
> A
>     ===> C
> B                 ==> E
>               D
>
> So two streams A and B connect at first with 3 hours late on low
> watermark, after data has been emitted (the output C stream), a new stream
> D connect to C and emitted E as final output. I was wondering how the
> downstream watermark should be defined. Should I give C stream a new
> watermark for 3 hours delay again? or when I connect stream D, everything
> will be 6 hours late on low watermark.
>
> I am using BoundedOutOfOrdernessGenerator[1] with maxOutOfOrderness 3
> hours
>
> Thanks for your tips and help in advance.
>
> Best,
> Chengzhi
>
> [1]https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/event_
> timestamps_watermarks.html#with-periodic-watermarks
>