You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by lxk <lx...@163.com> on 2022/07/06 05:35:38 UTC

Flink interval join 水印疑问

在使用interval join的时候有一些疑问,希望大家能够帮忙解答一下
https://pic.imgdb.cn/item/62c5015b5be16ec74ac2b23f.png
官方文档中说会从两个流中的 timestamp  中取最大值,看了下源码确实是这样
https://pic.imgdb.cn/item/62c51e8b5be16ec74ae22880.png
我的问题是:
1.这里的timestamp和watermark有什么区别?
2.interval join中watermark是怎么计算的?两个流取最大的timestamp之后,watermark跟这个最大的timestamp是否有某种联系

Re: Re: Flink interval join 水印疑问

Posted by yidan zhao <hi...@gmail.com>.
你代码没格式化,不方便看。
首先我们讨论的都是eventtime场景,processtime场景下watermark没用。

假设Record代表一次请求信息,那么可以把该请求的发生时间作为该Record的eventime,也就是这个Record的timestamp。
现在要统计每5分钟的pv信息,就需要依据这个Record的timestamp决定划分到哪个窗口,比如划分到了 yyyyMMdd 8:30 ~
yyyyMMdd 8:35 这个窗口。
考虑到数据可以迟到,任何时刻,比如即使到了 8.40分,也可能再出现 8.33 分的数据。
但是,flink不可能无限等待下午,8.35的窗口必须在某个时机闭合并输出该窗口的统计结果到下游。
如果采用 EventTimeTrigger
的话,这个决定闭合窗口的时机就是:当watermark达到窗口的maxTimestamp。该窗口的maxTimestamp就是 8.35
分那个点。
watermark则是根据用户选择的策略生成,比如在source部分,根据当前task看到的最大的Record的timestamp,减去一个
maxOutOfOrderness 即为 watermark。这个 maxOutOfOrderness 就是允许数据乱序的程度。

至于watermark的传播,简单说就是向后广播即可。
双流 join 情况的话,需要取小,取2个watermark的更小的那个。
对于从同一个流进入的watermark是取大(这个其实逻辑正确的话,生成端就决定了递增了,接收端做个判定只是保险,避免出现watermark倒退而已)。

lxk <lx...@163.com> 于2022年7月7日周四 19:57写道:
>
> 按照这个说法,那么timestamp和watermark其实没有关系。
> 但是我看到有关帖子里说:双流join里存储的mapstate<Long,StreamRecord>。
> 而StreamRecord和watermark都是继承于streamelement,Flink会替换StreamRecord 对象中的Timestamp,如果 根据当前事件的Timestamp 生成的Watermark 大于上一次的Watermark,就发出新的Watermark。
> 具体代码在 TimestampsAndPunctuatedWatermarksOperator.processElement。
> @OverridepublicvoidprocessElement(StreamRecord<T> element)throws Exception { finalTvalue= element.getValue(); // 调用 用户实现的 extractTimestamp 获取新的TimestampfinallongnewTimestamp= userFunction.extractTimestamp(value, element.hasTimestamp() ? element.getTimestamp() : Long.MIN_VALUE); // 用新Timestamp 替换StreamRecord中的旧Timestamp output.collect(element.replace(element.getValue(), newTimestamp)); // 调用 用户实现的 checkAndGetNextWatermark 方法获取下一个WatermarkfinalWatermarknextWatermark= userFunction.checkAndGetNextWatermark(value, newTimestamp); // 如果下一个Watermark 大于当前Watermark,就发出新的Watermarkif (nextWatermark != null && nextWatermark.getTimestamp() > currentWatermark) { currentWatermark = nextWatermark.getTimestamp(); output.emitWatermark(nextWatermark); } }
> 以上是我看见的帖子中的相关内容
> 如果上述说法不对的话,那么在双流join中,watermark是怎么流转的?
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2022-07-07 10:03:00,"yidan zhao" <hi...@gmail.com> 写道:
> >timestamp是为每个element(输入的记录)赋值的一个时间戳。
> >watermark是从source部分生成的水印个,然后向后传播。
> >
> >以分窗口为例,数据记录的timestamp用于决定数据划分入哪个窗口。
> >watermark用于决定窗口啥时候闭合,比如窗口是0-5s,那么当watermark达到5s的时候,窗口就会闭合。
> >
> >考虑数据不一定能及时到达,可以让watermark=max(timestamp)-30s。30s即可容忍给的数据乱序的程度。
> >
> >lxk <lx...@163.com> 于2022年7月6日周三 13:36写道:
> >>
> >> 在使用interval join的时候有一些疑问,希望大家能够帮忙解答一下
> >> https://pic.imgdb.cn/item/62c5015b5be16ec74ac2b23f.png
> >> 官方文档中说会从两个流中的 timestamp  中取最大值,看了下源码确实是这样
> >> https://pic.imgdb.cn/item/62c51e8b5be16ec74ae22880.png
> >> 我的问题是:
> >> 1.这里的timestamp和watermark有什么区别?
> >> 2.interval join中watermark是怎么计算的?两个流取最大的timestamp之后,watermark跟这个最大的timestamp是否有某种联系
>
>

Re:Re: Flink interval join 水印疑问

Posted by lxk <lx...@163.com>.
按照这个说法,那么timestamp和watermark其实没有关系。
但是我看到有关帖子里说:双流join里存储的mapstate<Long,StreamRecord>。
而StreamRecord和watermark都是继承于streamelement,Flink会替换StreamRecord 对象中的Timestamp,如果 根据当前事件的Timestamp 生成的Watermark 大于上一次的Watermark,就发出新的Watermark。
具体代码在 TimestampsAndPunctuatedWatermarksOperator.processElement。
@OverridepublicvoidprocessElement(StreamRecord<T> element)throws Exception { finalTvalue= element.getValue(); // 调用 用户实现的 extractTimestamp 获取新的TimestampfinallongnewTimestamp= userFunction.extractTimestamp(value, element.hasTimestamp() ? element.getTimestamp() : Long.MIN_VALUE); // 用新Timestamp 替换StreamRecord中的旧Timestamp output.collect(element.replace(element.getValue(), newTimestamp)); // 调用 用户实现的 checkAndGetNextWatermark 方法获取下一个WatermarkfinalWatermarknextWatermark= userFunction.checkAndGetNextWatermark(value, newTimestamp); // 如果下一个Watermark 大于当前Watermark,就发出新的Watermarkif (nextWatermark != null && nextWatermark.getTimestamp() > currentWatermark) { currentWatermark = nextWatermark.getTimestamp(); output.emitWatermark(nextWatermark); } }
以上是我看见的帖子中的相关内容
如果上述说法不对的话,那么在双流join中,watermark是怎么流转的?














在 2022-07-07 10:03:00,"yidan zhao" <hi...@gmail.com> 写道:
>timestamp是为每个element(输入的记录)赋值的一个时间戳。
>watermark是从source部分生成的水印个,然后向后传播。
>
>以分窗口为例,数据记录的timestamp用于决定数据划分入哪个窗口。
>watermark用于决定窗口啥时候闭合,比如窗口是0-5s,那么当watermark达到5s的时候,窗口就会闭合。
>
>考虑数据不一定能及时到达,可以让watermark=max(timestamp)-30s。30s即可容忍给的数据乱序的程度。
>
>lxk <lx...@163.com> 于2022年7月6日周三 13:36写道:
>>
>> 在使用interval join的时候有一些疑问,希望大家能够帮忙解答一下
>> https://pic.imgdb.cn/item/62c5015b5be16ec74ac2b23f.png
>> 官方文档中说会从两个流中的 timestamp  中取最大值,看了下源码确实是这样
>> https://pic.imgdb.cn/item/62c51e8b5be16ec74ae22880.png
>> 我的问题是:
>> 1.这里的timestamp和watermark有什么区别?
>> 2.interval join中watermark是怎么计算的?两个流取最大的timestamp之后,watermark跟这个最大的timestamp是否有某种联系



Re: Flink interval join 水印疑问

Posted by yidan zhao <hi...@gmail.com>.
timestamp是为每个element(输入的记录)赋值的一个时间戳。
watermark是从source部分生成的水印个,然后向后传播。

以分窗口为例,数据记录的timestamp用于决定数据划分入哪个窗口。
watermark用于决定窗口啥时候闭合,比如窗口是0-5s,那么当watermark达到5s的时候,窗口就会闭合。

考虑数据不一定能及时到达,可以让watermark=max(timestamp)-30s。30s即可容忍给的数据乱序的程度。

lxk <lx...@163.com> 于2022年7月6日周三 13:36写道:
>
> 在使用interval join的时候有一些疑问,希望大家能够帮忙解答一下
> https://pic.imgdb.cn/item/62c5015b5be16ec74ac2b23f.png
> 官方文档中说会从两个流中的 timestamp  中取最大值,看了下源码确实是这样
> https://pic.imgdb.cn/item/62c51e8b5be16ec74ae22880.png
> 我的问题是:
> 1.这里的timestamp和watermark有什么区别?
> 2.interval join中watermark是怎么计算的?两个流取最大的timestamp之后,watermark跟这个最大的timestamp是否有某种联系