You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by 守护 <34...@qq.com> on 2020/03/31 08:31:01 UTC

回复: ProcessWindowFunction中如何有效清除state呢

感谢您回复


代码中if(stateDate.equals("") || stateDate.equals(date))的判断逻辑确实能走到pv_st.clear()中,1.最后输出结果时发现pv_st中的状态没有清空,还是累加计算,2.state.clear() 之后,再次获取时,返回值会是null,代码片段里面确实没有对null值的校验,而是直接更新值pv_st.update(pv_st.value() + c_st),是和这个能有关系吗








------------------&nbsp;原始邮件&nbsp;------------------
发件人:&nbsp;"Yun Tang"<myasuka@live.com&gt;;
发送时间:&nbsp;2020年3月31日(星期二) 下午3:59
收件人:&nbsp;"user-zh"<user-zh@flink.apache.org&gt;;

主题:&nbsp;Re:  ProcessWindowFunction中如何有效清除state呢



Hi

从代码看 if(stateDate.equals("") || stateDate.equals(date)) 无法判断究竟是从哪里获取到stateDate变量赋值,不清楚你这里里面的判断逻辑是否能生效。
其次,state.clear() 之后,再次获取时,返回值会是null,代码片段里面也没有看出来哪里有对这个的校验。

祝好
唐云
________________________________
From: 守护 <346531110@qq.com&gt;
Sent: Tuesday, March 31, 2020 12:33
To: user-zh <user-zh@flink.apache.org&gt;
Subject: ProcessWindowFunction中如何有效清除state呢

各位好:


--版本
FLINK 1.10.0 ON YARN


--过程
1.定义一个&amp;nbsp; .window(TumblingProcessingTimeWindows.of(Time.days(1)))窗口
2.定义一个new Trigger(),.trigger(ContinuousProcessingTimeTrigger.of(Time.minutes(10)))
3.定义一个new ProcessWindowFunction(),每隔固定时间计算一次,业务是每天0点开始算这一天的数据,第二天清空从新计算,
--问题
&amp;nbsp;在 new ProcessWindowFunction()中创建了ValueState,想在第二天0点的时候ValueState清空开始重新计算,但是返现ValueState并没有清空,而是叠加前一天的继续计算,这个.clear()方法应该在什么时候加,才能生效呢?



--部分代码


&nbsp; .window(TumblingProcessingTimeWindows.of(Time.days(1)))&nbsp;&nbsp; .trigger(ContinuousProcessingTimeTrigger.of(Time.minutes(10)))
.process(new ProcessWindowFunction[(String,String,Long), String, Tuple, TimeWindow] {


&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; private var pv_st: ValueState[Long] = _&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp;


&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; override def open(parameters: Configuration): Unit = {
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp;pv_st = getRuntimeContext.getState[Long](new ValueStateDescriptor[Long]("pv_stCount", classOf[Long]))
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; }


&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp;override def process(key: Tuple, context: Context, elements: Iterable[(String,String,Long)], out: Collector[String]): Unit = {
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; var c_st = 0
&amp;nbsp; &amp;nbsp;
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; val elementsIterator = elements.iterator
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; // 遍历窗口数据,获取唯一word
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; while (elementsIterator.hasNext) {
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; val ac_name = elementsIterator.next()._2
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; if(!ac_name.isEmpty &amp;amp;&amp;amp; ac_name.equals("listentime")){
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; c_st +=1
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; }
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; }
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; val time: Date = new Date()
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; val dateFormat: SimpleDateFormat = new SimpleDateFormat("yyyy-MM-dd")
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; val date = dateFormat.format(time)


&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; // add current
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp;pv_st.update(pv_st.value() + c_st)


&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; var jsonStr = ""+key.getField(0)+"_"+date+"&amp;amp;" // json格式开始
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; jsonStr += "{"+
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; "\"yesterday_foreground_play_pv\":\""+pv_st.value()+
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; "\"}";




&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; //判断逻辑,是否到第二天,如果到第二天状态数据全部清空,重新累加
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; if(stateDate.equals("") || stateDate.equals(date)){
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; stateDate=date
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; out.collect(jsonStr)
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; }else{
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; out.collect(jsonStr)
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; pv_st.clear()
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; stateDate=date
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; }


&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; }


&amp;nbsp; &amp;nbsp; &amp;nbsp; })

Re: 回复: ProcessWindowFunction中如何有效清除state呢

Posted by Yun Tang <my...@live.com>.
Hi

我觉得你的整个程序能从没有checkpoint开始跑就很奇怪,你们的 value state descriptor里面没有定义default value,那么调用#value() 接口返回的就是null,所以第一次调用 #update 时候还从state里面取值,最后还能跑通就很奇怪。

我建议本地在IDE里面debug看一下吧,可以把clear的条件改一下,不要弄成隔天才清理,可以让本地可以复现问题。

祝好
唐云
________________________________
From: 守护 <34...@qq.com>
Sent: Tuesday, March 31, 2020 16:31
To: user-zh <us...@flink.apache.org>
Subject: 回复: ProcessWindowFunction中如何有效清除state呢

感谢您回复


代码中if(stateDate.equals("") || stateDate.equals(date))的判断逻辑确实能走到pv_st.clear()中,1.最后输出结果时发现pv_st中的状态没有清空,还是累加计算,2.state.clear() 之后,再次获取时,返回值会是null,代码片段里面确实没有对null值的校验,而是直接更新值pv_st.update(pv_st.value() + c_st),是和这个能有关系吗








------------------&nbsp;原始邮件&nbsp;------------------
发件人:&nbsp;"Yun Tang"<myasuka@live.com&gt;;
发送时间:&nbsp;2020年3月31日(星期二) 下午3:59
收件人:&nbsp;"user-zh"<user-zh@flink.apache.org&gt;;

主题:&nbsp;Re:  ProcessWindowFunction中如何有效清除state呢



Hi

从代码看 if(stateDate.equals("") || stateDate.equals(date)) 无法判断究竟是从哪里获取到stateDate变量赋值,不清楚你这里里面的判断逻辑是否能生效。
其次,state.clear() 之后,再次获取时,返回值会是null,代码片段里面也没有看出来哪里有对这个的校验。

祝好
唐云
________________________________
From: 守护 <346531110@qq.com&gt;
Sent: Tuesday, March 31, 2020 12:33
To: user-zh <user-zh@flink.apache.org&gt;
Subject: ProcessWindowFunction中如何有效清除state呢

各位好:


--版本
FLINK 1.10.0 ON YARN


--过程
1.定义一个&amp;nbsp; .window(TumblingProcessingTimeWindows.of(Time.days(1)))窗口
2.定义一个new Trigger(),.trigger(ContinuousProcessingTimeTrigger.of(Time.minutes(10)))
3.定义一个new ProcessWindowFunction(),每隔固定时间计算一次,业务是每天0点开始算这一天的数据,第二天清空从新计算,
--问题
&amp;nbsp;在 new ProcessWindowFunction()中创建了ValueState,想在第二天0点的时候ValueState清空开始重新计算,但是返现ValueState并没有清空,而是叠加前一天的继续计算,这个.clear()方法应该在什么时候加,才能生效呢?



--部分代码


&nbsp; .window(TumblingProcessingTimeWindows.of(Time.days(1)))&nbsp;&nbsp; .trigger(ContinuousProcessingTimeTrigger.of(Time.minutes(10)))
.process(new ProcessWindowFunction[(String,String,Long), String, Tuple, TimeWindow] {


&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; private var pv_st: ValueState[Long] = _&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp;


&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; override def open(parameters: Configuration): Unit = {
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp;pv_st = getRuntimeContext.getState[Long](new ValueStateDescriptor[Long]("pv_stCount", classOf[Long]))
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; }


&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp;override def process(key: Tuple, context: Context, elements: Iterable[(String,String,Long)], out: Collector[String]): Unit = {
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; var c_st = 0
&amp;nbsp; &amp;nbsp;
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; val elementsIterator = elements.iterator
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; // 遍历窗口数据,获取唯一word
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; while (elementsIterator.hasNext) {
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; val ac_name = elementsIterator.next()._2
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; if(!ac_name.isEmpty &amp;amp;&amp;amp; ac_name.equals("listentime")){
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; c_st +=1
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; }
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; }
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; val time: Date = new Date()
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; val dateFormat: SimpleDateFormat = new SimpleDateFormat("yyyy-MM-dd")
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; val date = dateFormat.format(time)


&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; // add current
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp;pv_st.update(pv_st.value() + c_st)


&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; var jsonStr = ""+key.getField(0)+"_"+date+"&amp;amp;" // json格式开始
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; jsonStr += "{"+
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; "\"yesterday_foreground_play_pv\":\""+pv_st.value()+
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; "\"}";




&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; //判断逻辑,是否到第二天,如果到第二天状态数据全部清空,重新累加
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; if(stateDate.equals("") || stateDate.equals(date)){
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; stateDate=date
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; out.collect(jsonStr)
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; }else{
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; out.collect(jsonStr)
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; pv_st.clear()
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; stateDate=date
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; }


&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; }


&amp;nbsp; &amp;nbsp; &amp;nbsp; })