You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Chengzhi Zhao <w....@gmail.com> on 2018/04/13 15:48:39 UTC
Slow watermark advances
Hi, flink community,
I had an issue with slow watermark advances and needs some help here. So
here is what happened: I have two streams -- A and B, and they perform
co-process to join together and A has another steam as output.
A --> Output
B --> (Connect A) --> Output
I used BoundedOutOfOrdernessGenerator [1] with both A and B stream with 2
hours delay. The low watermark of A and output sink is within 2 hours
window, however, the co-process end up with 10 hours low watermark late.
My setup is I am using file system as source, so every 15 mins there will
be files been drop to a directory and flink pick them up from there.
Please advise and appreciate it in advance!
[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/event_timestamps_watermarks.html#with-periodic-watermarks
Best,
Chengzhi
Re: Slow watermark advances
Posted by Xingcan Cui <xi...@gmail.com>.
Yes, Chengzhi. That’s exactly what I mean. But you should be careful with the semantics of your pipeline. The problem cannot be gracefully solved if there’s a natural time offset between the two streams.
Best, Xingcan
> On 14 Apr 2018, at 4:00 AM, Chengzhi Zhao <w....@gmail.com> wrote:
>
> Hi Xingcan,
>
> Thanks for your quick response and now I understand it better. To clarify, do you mean try to add a static time when I override extractTimestamp function?
>
> For example,
>
> override def extractTimestamp(element: MyEvent, previousElementTimestamp: Long): Long = {
> val timestamp = element.getCreationTime() + 3600000L //1 hour delay
> currentMaxTimestamp = max(timestamp, currentMaxTimestamp)
> timestamp
> }
>
> Appreciate your help!
>
> Best,
> Chengzhi
>
>
> On Fri, Apr 13, 2018 at 12:49 PM, Xingcan Cui <xingcanc@gmail.com <ma...@gmail.com>> wrote:
> Hi Chengzhi,
>
> currently, the watermarks of the two streams of a connected stream are forcibly synchronized, i.e., the watermark is decided by the stream with a larger delay. Thus the window trigger is also affected by this mechanism.
>
> As a workaround, you could try to add (or subtract) a static time offset to one of your streams, which can make them more “close” to each other.
>
> Best,
> Xingcan
>
>
>> On 13 Apr 2018, at 11:48 PM, Chengzhi Zhao <w.zhaochengzhi@gmail.com <ma...@gmail.com>> wrote:
>>
>> Hi, flink community,
>>
>> I had an issue with slow watermark advances and needs some help here. So here is what happened: I have two streams -- A and B, and they perform co-process to join together and A has another steam as output.
>>
>> A --> Output
>> B --> (Connect A) --> Output
>>
>> I used BoundedOutOfOrdernessGenerator [1] with both A and B stream with 2 hours delay. The low watermark of A and output sink is within 2 hours window, however, the co-process end up with 10 hours low watermark late.
>>
>> My setup is I am using file system as source, so every 15 mins there will be files been drop to a directory and flink pick them up from there.
>>
>> Please advise and appreciate it in advance!
>>
>> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/event_timestamps_watermarks.html#with-periodic-watermarks <https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/event_timestamps_watermarks.html#with-periodic-watermarks>
>>
>> Best,
>> Chengzhi
>>
>
>
Re: Slow watermark advances
Posted by Chengzhi Zhao <w....@gmail.com>.
Hi Xingcan,
Thanks for your quick response and now I understand it better. To clarify,
do you mean try to add a static time when I override extractTimestamp
function?
For example,
override def extractTimestamp(element: MyEvent, previousElementTimestamp:
Long): Long = {
val timestamp = element.getCreationTime() + 3600000L //1 hour delay
currentMaxTimestamp = max(timestamp, currentMaxTimestamp)
timestamp
}
Appreciate your help!
Best,
Chengzhi
On Fri, Apr 13, 2018 at 12:49 PM, Xingcan Cui <xi...@gmail.com> wrote:
> Hi Chengzhi,
>
> currently, the watermarks of the two streams of a connected stream are
> forcibly synchronized, i.e., the watermark is decided by the stream with a
> larger delay. Thus the window trigger is also affected by this mechanism.
>
> As a workaround, you could try to add (or subtract) a static time offset
> to one of your streams, which can make them more “close” to each other.
>
> Best,
> Xingcan
>
>
> On 13 Apr 2018, at 11:48 PM, Chengzhi Zhao <w....@gmail.com>
> wrote:
>
> Hi, flink community,
>
> I had an issue with slow watermark advances and needs some help here. So
> here is what happened: I have two streams -- A and B, and they perform
> co-process to join together and A has another steam as output.
>
> A --> Output
> B --> (Connect A) --> Output
>
> I used BoundedOutOfOrdernessGenerator [1] with both A and B stream with 2
> hours delay. The low watermark of A and output sink is within 2 hours
> window, however, the co-process end up with 10 hours low watermark late.
>
> My setup is I am using file system as source, so every 15 mins there will
> be files been drop to a directory and flink pick them up from there.
>
> Please advise and appreciate it in advance!
>
> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/event_
> timestamps_watermarks.html#with-periodic-watermarks
>
> Best,
> Chengzhi
>
>
>
Re: Slow watermark advances
Posted by Xingcan Cui <xi...@gmail.com>.
Hi Chengzhi,
currently, the watermarks of the two streams of a connected stream are forcibly synchronized, i.e., the watermark is decided by the stream with a larger delay. Thus the window trigger is also affected by this mechanism.
As a workaround, you could try to add (or subtract) a static time offset to one of your streams, which can make them more “close” to each other.
Best,
Xingcan
> On 13 Apr 2018, at 11:48 PM, Chengzhi Zhao <w....@gmail.com> wrote:
>
> Hi, flink community,
>
> I had an issue with slow watermark advances and needs some help here. So here is what happened: I have two streams -- A and B, and they perform co-process to join together and A has another steam as output.
>
> A --> Output
> B --> (Connect A) --> Output
>
> I used BoundedOutOfOrdernessGenerator [1] with both A and B stream with 2 hours delay. The low watermark of A and output sink is within 2 hours window, however, the co-process end up with 10 hours low watermark late.
>
> My setup is I am using file system as source, so every 15 mins there will be files been drop to a directory and flink pick them up from there.
>
> Please advise and appreciate it in advance!
>
> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/event_timestamps_watermarks.html#with-periodic-watermarks <https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/event_timestamps_watermarks.html#with-periodic-watermarks>
>
> Best,
> Chengzhi
>