You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by op <52...@qq.com> on 2020/06/11 06:30:38 UTC

BLinkPlanner sql join状态清理

大家好,最近发现一个问题
发现同一套代码,用oldPlanner设置IdleStateRetentionTime后join两边表的状态可以被定时清理,但是用blinkplanner同样的代码,运行却发现状态不会被清理,这是个bug吗?

Re: Re: 回复: BLinkPlanner sql join状态清理

Posted by 刘大龙 <ld...@zju.edu.cn>.
Hi, MiniBatch Agg目前没有实现State TTl,我提了个PR修复这个问题,参考https://github.com/apache/flink/pull/11830
@Jark,辛苦有空时帮忙reveiw一下代码,这个问题越来越多用户用户遇到了。


> -----原始邮件-----
> 发件人: "刘建刚" <li...@gmail.com>
> 发送时间: 2020-09-29 18:27:47 (星期二)
> 收件人: user-zh <us...@flink.apache.org>
> 抄送: 
> 主题: Re: 回复: BLinkPlanner sql join状态清理
> 
> miniBatch下是无法ttl的,这个是修复方案:https://github.com/apache/flink/pull/11830
> 
> Benchao Li <li...@apache.org> 于2020年9月29日周二 下午5:18写道:
> 
> > Hi Ericliuk,
> >
> > 这应该是实现的bug,你可以去社区建一个issue描述下这个问题。
> > 有时间的话也可以帮忙修复一下,没有时间社区也会有其他小伙伴帮忙来修复的~
> >
> > Ericliuk <er...@gmail.com> 于2020年9月29日周二 下午4:59写道:
> >
> > > 我最近也遇到了这个问题,看了下,blink,并且配置minibatch优化后就会不使用IdleStateRetention相关配置了。
> > > <
> > >
> > http://apache-flink.147419.n8.nabble.com/file/t491/Xnip2020-09-29_16-55-32.png
> > >
> > >
> > >
> > > 不太清楚为什么用了mini batch就没读取这个配置。
> > > 一个属于状态清理,一个属于agg优化,优化配置要影响到原来的状态query状态清理么?
> > >
> > >
> > >
> > > --
> > > Sent from: http://apache-flink.147419.n8.nabble.com/
> >
> >
> >
> > --
> >
> > Best,
> > Benchao Li
> >


------------------------------

Best

Re: 回复: BLinkPlanner sql join状态清理

Posted by 刘建刚 <li...@gmail.com>.
miniBatch下是无法ttl的,这个是修复方案:https://github.com/apache/flink/pull/11830

Benchao Li <li...@apache.org> 于2020年9月29日周二 下午5:18写道:

> Hi Ericliuk,
>
> 这应该是实现的bug,你可以去社区建一个issue描述下这个问题。
> 有时间的话也可以帮忙修复一下,没有时间社区也会有其他小伙伴帮忙来修复的~
>
> Ericliuk <er...@gmail.com> 于2020年9月29日周二 下午4:59写道:
>
> > 我最近也遇到了这个问题,看了下,blink,并且配置minibatch优化后就会不使用IdleStateRetention相关配置了。
> > <
> >
> http://apache-flink.147419.n8.nabble.com/file/t491/Xnip2020-09-29_16-55-32.png
> >
> >
> >
> > 不太清楚为什么用了mini batch就没读取这个配置。
> > 一个属于状态清理,一个属于agg优化,优化配置要影响到原来的状态query状态清理么?
> >
> >
> >
> > --
> > Sent from: http://apache-flink.147419.n8.nabble.com/
>
>
>
> --
>
> Best,
> Benchao Li
>

Re: 回复: BLinkPlanner sql join状态清理

Posted by Benchao Li <li...@apache.org>.
Hi Ericliuk,

这应该是实现的bug,你可以去社区建一个issue描述下这个问题。
有时间的话也可以帮忙修复一下,没有时间社区也会有其他小伙伴帮忙来修复的~

Ericliuk <er...@gmail.com> 于2020年9月29日周二 下午4:59写道:

> 我最近也遇到了这个问题,看了下,blink,并且配置minibatch优化后就会不使用IdleStateRetention相关配置了。
> <
> http://apache-flink.147419.n8.nabble.com/file/t491/Xnip2020-09-29_16-55-32.png>
>
>
> 不太清楚为什么用了mini batch就没读取这个配置。
> 一个属于状态清理,一个属于agg优化,优化配置要影响到原来的状态query状态清理么?
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/



-- 

Best,
Benchao Li

Re: 回复: BLinkPlanner sql join状态清理

Posted by Ericliuk <er...@gmail.com>.
我最近也遇到了这个问题,看了下,blink,并且配置minibatch优化后就会不使用IdleStateRetention相关配置了。
<http://apache-flink.147419.n8.nabble.com/file/t491/Xnip2020-09-29_16-55-32.png> 

不太清楚为什么用了mini batch就没读取这个配置。
一个属于状态清理,一个属于agg优化,优化配置要影响到原来的状态query状态清理么?



--
Sent from: http://apache-flink.147419.n8.nabble.com/

回复: BLinkPlanner sql join状态清理

Posted by op <52...@qq.com>.
一开始用的Blinkplanner,试了几天状态都清理不掉,改成oldplanner后就可以了,版本是1.10




package test.table.sql


import java.util.Properties


import com.souhu.msns.huyou.PublicParams
import com.souhu.msns.huyou.utils.KafkaPbSchema
import org.apache.flink.api.common.time.Time
import org.apache.flink.runtime.state.filesystem.FsStateBackend
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010
import org.apache.flink.table.api.EnvironmentSettings
import org.apache.flink.table.api.scala._
import org.apache.flink.streaming.api.windowing.time.{Time =&gt; WindowTime}
import org.apache.flink.types.Row




object test {


&nbsp; def main(args: Array[String]): Unit = {


&nbsp; &nbsp; //----------------------------配置执行环境------------------------------------------------
&nbsp; &nbsp; val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment
&nbsp; &nbsp; bsEnv.setNumberOfExecutionRetries(1)
&nbsp; &nbsp; bsEnv.setParallelism(1)
&nbsp; &nbsp; //bsEnv.getConfig.setAutoWatermarkInterval(10000)
&nbsp; &nbsp; bsEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
&nbsp; &nbsp; bsEnv.setStateBackend(new FsStateBackend("hdfs://dc1:8020/user/msns/streaming/checkpoint/flink/Circ", true))
&nbsp; &nbsp; bsEnv.getCheckpointConfig.setCheckpointInterval(300000)
&nbsp; &nbsp; bsEnv.getCheckpointConfig.setMinPauseBetweenCheckpoints(60000)
&nbsp; &nbsp; bsEnv.setParallelism(3)
&nbsp; &nbsp; bsEnv.setNumberOfExecutionRetries(1)


&nbsp; &nbsp; //----------------------------配置TABLE环境------------------------------------------------


&nbsp; &nbsp; val setting = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
&nbsp; &nbsp; val bstEnv = StreamTableEnvironment.create(bsEnv,setting)
&nbsp; &nbsp; val tConfig = bstEnv.getConfig
&nbsp; &nbsp; tConfig.setIdleStateRetentionTime(Time.minutes(10),Time.minutes(20))
&nbsp; &nbsp; val config = bstEnv.getConfig.getConfiguration()
&nbsp; &nbsp; config.setString("table.exec.mini-batch.enabled", "true") // local-global aggregation depends on mini-batch is enabled
&nbsp; &nbsp; config.setString("table.exec.mini-batch.allow-latency", "5 s")
&nbsp; &nbsp; config.setString("table.exec.mini-batch.size", "5000")
&nbsp; &nbsp; config.setString("table.optimizer.agg-phase-strategy", "TWO_PHASE") // enable two-phase, i.e. local-global aggregation
&nbsp; &nbsp; config.setString("table.optimizer.distinct-agg.split.enabled", "true")
&nbsp; &nbsp; //bstEnv.getConfig.setLocalTimeZone(ZoneId.of("Etc/GMT+8"))
&nbsp;&nbsp;


&nbsp; &nbsp; //----------------------------创建数据源和表------------------------------------------------
&nbsp; &nbsp; val kafkaProps = new Properties()
&nbsp; &nbsp; kafkaProps.setProperty("bootstrap.servers", PublicParams.brokers)
&nbsp; &nbsp; val source = ....
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .toTable(bstEnv,'userId,'createTime.rowtime,'action,'circleName,'flowName,'ts,'content,'feedid,'postfeedid,'sessionId)


&nbsp; &nbsp; bstEnv.createTemporaryView("source",source)


&nbsp; &nbsp; val q1=bstEnv.sqlQuery(
&nbsp; &nbsp; &nbsp; """select sessionId from source
&nbsp; &nbsp; &nbsp; &nbsp; |where sessionId is not null
&nbsp; &nbsp; &nbsp; &nbsp; |and action='P_TIMELINE'""".stripMargin)
&nbsp; &nbsp; &nbsp; q1.toAppendStream[Row].print("source")
&nbsp; &nbsp; &nbsp;bstEnv.createTemporaryView("sourcefeed",q1)
	&nbsp;
&nbsp; &nbsp; val q2=bstEnv.sqlQuery(
&nbsp; &nbsp; &nbsp; """select sessionId from source
&nbsp; &nbsp; &nbsp; &nbsp; |where sessionId is not null
&nbsp; &nbsp; &nbsp; &nbsp; |and action='V_TIMELINE_FEED'""".stripMargin)
&nbsp; &nbsp; bstEnv.createTemporaryView("postfeed",q2)
	
&nbsp; &nbsp; bstEnv.sqlQuery(
&nbsp; &nbsp; &nbsp; """
&nbsp; &nbsp; &nbsp; &nbsp; |select count(b.sessionId) from
&nbsp; &nbsp; &nbsp; &nbsp; |sourcefeed a
&nbsp; &nbsp; &nbsp; &nbsp; |join postfeed b
&nbsp; &nbsp; &nbsp; &nbsp; |on a.sessionId=b.sessionId
&nbsp; &nbsp; &nbsp; """.stripMargin).toRetractStream[Row].print("")




&nbsp; &nbsp; bstEnv.execute("")
&nbsp; }
}









------------------&nbsp;原始邮件&nbsp;------------------
发件人:&nbsp;"Leonard Xu"<xbjtdcq@gmail.com&gt;;
发送时间:&nbsp;2020年6月11日(星期四) 下午2:40
收件人:&nbsp;"user-zh"<user-zh@flink.apache.org&gt;;

主题:&nbsp;Re: BLinkPlanner sql join状态清理



Hi,

可以稳定复现吗?麻烦贴下flink版本和你的case, 我可以帮忙跟进确认下

Best,
Leonard Xu
&gt; 在 2020年6月11日,14:30,op <520075694@qq.com&gt; 写道:
&gt; 
&gt; 大家好,最近发现一个问题
&gt; 发现同一套代码,用oldPlanner设置IdleStateRetentionTime后join两边表的状态可以被定时清理,但是用blinkplanner同样的代码,运行却发现状态不会被清理,这是个bug吗?

Re: BLinkPlanner sql join状态清理

Posted by Leonard Xu <xb...@gmail.com>.
Hi,

可以稳定复现吗?麻烦贴下flink版本和你的case, 我可以帮忙跟进确认下

Best,
Leonard Xu
> 在 2020年6月11日,14:30,op <52...@qq.com> 写道:
> 
> 大家好,最近发现一个问题
> 发现同一套代码,用oldPlanner设置IdleStateRetentionTime后join两边表的状态可以被定时清理,但是用blinkplanner同样的代码,运行却发现状态不会被清理,这是个bug吗?