You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by x <35...@qq.com> on 2020/10/19 10:23:25 UTC

回复: 求助:如何处理数据不连续导致状态无法清理

也就是说需要用KeyedProcessFunction代替ProcessWindowFunction.




------------------&nbsp;原始邮件&nbsp;------------------
发件人:                                                                                                                        "user-zh"                                                                                    <qcx978132955@gmail.com&gt;;
发送时间:&nbsp;2020年10月19日(星期一) 下午3:28
收件人:&nbsp;"user-zh"<user-zh@flink.apache.org&gt;;

主题:&nbsp;Re: 求助:如何处理数据不连续导致状态无法清理



Hi
&nbsp;&nbsp;&nbsp;&nbsp; 或许你可以使用 timer 来进行兜底,注册一个未来某个时间的 timer,然后 timer 触发的时候把 state 清理掉
Best,
Congxian


x <35907418@qq.com&gt; 于2020年10月19日周一 下午2:55写道:

&gt; 版本为v1.10.1
&gt; 使用AggregateFunction+ProcessWindowFunction的方式,进行实时统计,ProcessWindowFunction中涉及状态的累计运算,使用事件时间,按维度+日期分区,按分钟开窗,跨天需要将状态清除,避免状态越来越大。状态清除的逻辑,覆盖ProcessWindowFunction的clear方法,判断窗口开始时间是否为“23:59:00”,如下:override
&gt; def clear(ctx: Context): Unit = {
&gt;&nbsp;&nbsp; val dt = new SimpleDateFormat("HH:mm:00").format(ctx.window.getStart)
&gt;&nbsp;&nbsp; if(dt.equals("23:59:00")){
&gt;
&gt; state.clear()遇到的一个问题是,开窗前,keyBy分区时,有的key对应的数据不连续,十分稀疏,可能会出现每天的最后一个窗口没有数据,导致无法触发状态清理逻辑,导致总状态数据越来越大的现象,请问各位老师,有什么好的办法,可以避免这种情况吗?