You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by crazy <24...@qq.com.INVALID> on 2023/03/24 16:28:26 UTC

flink watermark 乱序数据问题

大佬好,如下程序,flink在生成watermark策略中,forBoundedOutOfOrderness 这个乱序时长的指定会不会导致数据的丢失呢?比如有数据事件时间超过5ms,这条数据会进入到streamTS里吗?


SingleOutputStreamOperator<ClickEvent&gt; streamTS = mySource.assignTimestampsAndWatermarks(&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; WatermarkStrategy.<ClickEvent&gt;forBoundedOutOfOrderness(Duration.ofMillis(5))&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .withTimestampAssigner(&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; new SerializableTimestampAssigner<ClickEvent&gt;() {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;@Override&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;public long extractTimestamp(ClickEvent event, long recordTimestamp) {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;return event.getDateTime();&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;}&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;})&nbsp; &nbsp; &nbsp; &nbsp;);

Re: flink watermark 乱序数据问题

Posted by Shammon FY <zj...@gmail.com>.
Hi

使用withTimestampAssigner只是定义了生成watermark消息的策略,不会影响数据流。超出指定时间的数据是否处理,可以在定义window的时候使用allowedLateness定义最晚的late
event,超出这个时间的窗口数据会直接丢弃

Best,
Shammon FY

On Sat, Mar 25, 2023 at 12:28 AM crazy <24...@qq.com.invalid> wrote:

> 大佬好,如下程序,flink在生成watermark策略中,forBoundedOutOfOrderness
> 这个乱序时长的指定会不会导致数据的丢失呢?比如有数据事件时间超过5ms,这条数据会进入到streamTS里吗?
>
>
> SingleOutputStreamOperator<ClickEvent&gt; streamTS =
> mySource.assignTimestampsAndWatermarks(&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp;
> WatermarkStrategy.<ClickEvent&gt;forBoundedOutOfOrderness(Duration.ofMillis(5))&nbsp;
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;
> &nbsp; .withTimestampAssigner(&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; new
> SerializableTimestampAssigner<ClickEvent&gt;() {&nbsp; &nbsp; &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp; &nbsp;@Override&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp;public long extractTimestamp(ClickEvent event, long
> recordTimestamp) {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;
> &nbsp; &nbsp;return event.getDateTime();&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp;}&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;
> &nbsp;})&nbsp; &nbsp; &nbsp; &nbsp;);