You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by 1530130567 <15...@qq.com> on 2019/12/25 01:56:51 UTC

回复: 关于flink窗口是否正确关闭的问题

大佬好:
我昨天看了一下metric,确实是recordsIn&gt;recordsOut
代码里就是用了一个window然后配processfunction,也没有任何的filter操作。
代码如下:
.window(TumblingEventTimeWindows.of(org.apache.flink.streaming.api.windowing.time.Time.minutes(1)))
.process(new ProcessWindowFunction<Row, Row, Integer, TimeWindow&gt;() {
    @Override
    public void process(Integer integer, Context context, Iterable<Row&gt; elements, Collector<Row&gt; out) {
        for (Row element : elements) {
                out.collect(element);
        }
    }
})




------------------&nbsp;原始邮件&nbsp;------------------
发件人:&nbsp;"jingjing bai"<baijingjing7449@gmail.com&gt;;
发送时间:&nbsp;2019年12月24日(星期二) 晚上9:18
收件人:&nbsp;"user-zh"<user-zh@flink.apache.org&gt;;

主题:&nbsp;Re: 关于flink窗口是否正确关闭的问题



窗口不会提前关闭,请查看下metircs是否有数据丢弃,


1530130567 <1530130567@qq.com&gt; 于2019年12月24日周二 下午8:46写道:

&gt; 各位大佬好:
&gt; &amp;nbsp; 最近在使用flink stream api处理数据,逻辑是非常简单的ETL操作
&gt; &amp;nbsp; 我自己定义的一个1分钟的tumble窗口,watermark是10s,当处在流量高峰时段时,发现下游出现了数据丢失的问题。
&gt; &amp;nbsp; 举个例子:我上游topic 5000/s,下游接受数据的topic只有4000/s
&gt; &amp;nbsp;
&gt; 在流量低谷时就没有这个问题,而且我把窗口去掉后也没有这个问题,是否是窗口被提前关闭了呢?导致我下游的processfunction还没处理完?
&gt; &amp;nbsp; ps:我加大了并行度还是不行

Re: 关于flink窗口是否正确关闭的问题

Posted by Jary Zhen <ja...@gmail.com>.
使用基于EventTime 的 watermark处理数据通常会碰到两这么两种情况:
1.  数据因为乱序,迟到严重,会被丢弃,这个可以查看Side Out API [1]
2.
 数据产生的事件时间比当前系统时间大,我称之为“超自然数据”,比如当前系统时间是10:37:55,但数据产生的事件时间可能是10:38:55,那么一旦有这类数据到达,将会使窗口提前触发计算,导致正常数据被当做迟到数据,因而被丢弃,这个处理方式是在assignWaterMark
之前过滤掉。
3. 建议: 如果是简单的ETL,尽量不要用EventTime 来处理数据

[1]
https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/side_output.html

On Wed, 25 Dec 2019 at 09:57, 1530130567 <15...@qq.com> wrote:

> 大佬好:
> 我昨天看了一下metric,确实是recordsIn&gt;recordsOut
> 代码里就是用了一个window然后配processfunction,也没有任何的filter操作。
> 代码如下:
>
> .window(TumblingEventTimeWindows.of(org.apache.flink.streaming.api.windowing.time.Time.minutes(1)))
> .process(new ProcessWindowFunction<Row, Row, Integer, TimeWindow&gt;() {
>     @Override
>     public void process(Integer integer, Context context, Iterable<Row&gt;
> elements, Collector<Row&gt; out) {
>         for (Row element : elements) {
>                 out.collect(element);
>         }
>     }
> })
>
>
>
>
> ------------------&nbsp;原始邮件&nbsp;------------------
> 发件人:&nbsp;"jingjing bai"<baijingjing7449@gmail.com&gt;;
> 发送时间:&nbsp;2019年12月24日(星期二) 晚上9:18
> 收件人:&nbsp;"user-zh"<user-zh@flink.apache.org&gt;;
>
> 主题:&nbsp;Re: 关于flink窗口是否正确关闭的问题
>
>
>
> 窗口不会提前关闭,请查看下metircs是否有数据丢弃,
>
>
> 1530130567 <1530130567@qq.com&gt; 于2019年12月24日周二 下午8:46写道:
>
> &gt; 各位大佬好:
> &gt; &amp;nbsp; 最近在使用flink stream api处理数据,逻辑是非常简单的ETL操作
> &gt; &amp;nbsp;
> 我自己定义的一个1分钟的tumble窗口,watermark是10s,当处在流量高峰时段时,发现下游出现了数据丢失的问题。
> &gt; &amp;nbsp; 举个例子:我上游topic 5000/s,下游接受数据的topic只有4000/s
> &gt; &amp;nbsp;
> &gt;
> 在流量低谷时就没有这个问题,而且我把窗口去掉后也没有这个问题,是否是窗口被提前关闭了呢?导致我下游的processfunction还没处理完?
> &gt; &amp;nbsp; ps:我加大了并行度还是不行