You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by op <52...@qq.com> on 2020/06/11 06:46:06 UTC
回复: BLinkPlanner sql join状态清理
一开始用的Blinkplanner,试了几天状态都清理不掉,改成oldplanner后就可以了,版本是1.10
package test.table.sql
import java.util.Properties
import com.souhu.msns.huyou.PublicParams
import com.souhu.msns.huyou.utils.KafkaPbSchema
import org.apache.flink.api.common.time.Time
import org.apache.flink.runtime.state.filesystem.FsStateBackend
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010
import org.apache.flink.table.api.EnvironmentSettings
import org.apache.flink.table.api.scala._
import org.apache.flink.streaming.api.windowing.time.{Time => WindowTime}
import org.apache.flink.types.Row
object test {
def main(args: Array[String]): Unit = {
//----------------------------配置执行环境------------------------------------------------
val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment
bsEnv.setNumberOfExecutionRetries(1)
bsEnv.setParallelism(1)
//bsEnv.getConfig.setAutoWatermarkInterval(10000)
bsEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
bsEnv.setStateBackend(new FsStateBackend("hdfs://dc1:8020/user/msns/streaming/checkpoint/flink/Circ", true))
bsEnv.getCheckpointConfig.setCheckpointInterval(300000)
bsEnv.getCheckpointConfig.setMinPauseBetweenCheckpoints(60000)
bsEnv.setParallelism(3)
bsEnv.setNumberOfExecutionRetries(1)
//----------------------------配置TABLE环境------------------------------------------------
val setting = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
val bstEnv = StreamTableEnvironment.create(bsEnv,setting)
val tConfig = bstEnv.getConfig
tConfig.setIdleStateRetentionTime(Time.minutes(10),Time.minutes(20))
val config = bstEnv.getConfig.getConfiguration()
config.setString("table.exec.mini-batch.enabled", "true") // local-global aggregation depends on mini-batch is enabled
config.setString("table.exec.mini-batch.allow-latency", "5 s")
config.setString("table.exec.mini-batch.size", "5000")
config.setString("table.optimizer.agg-phase-strategy", "TWO_PHASE") // enable two-phase, i.e. local-global aggregation
config.setString("table.optimizer.distinct-agg.split.enabled", "true")
//bstEnv.getConfig.setLocalTimeZone(ZoneId.of("Etc/GMT+8"))
//----------------------------创建数据源和表------------------------------------------------
val kafkaProps = new Properties()
kafkaProps.setProperty("bootstrap.servers", PublicParams.brokers)
val source = ....
.toTable(bstEnv,'userId,'createTime.rowtime,'action,'circleName,'flowName,'ts,'content,'feedid,'postfeedid,'sessionId)
bstEnv.createTemporaryView("source",source)
val q1=bstEnv.sqlQuery(
"""select sessionId from source
|where sessionId is not null
|and action='P_TIMELINE'""".stripMargin)
q1.toAppendStream[Row].print("source")
bstEnv.createTemporaryView("sourcefeed",q1)
val q2=bstEnv.sqlQuery(
"""select sessionId from source
|where sessionId is not null
|and action='V_TIMELINE_FEED'""".stripMargin)
bstEnv.createTemporaryView("postfeed",q2)
bstEnv.sqlQuery(
"""
|select count(b.sessionId) from
|sourcefeed a
|join postfeed b
|on a.sessionId=b.sessionId
""".stripMargin).toRetractStream[Row].print("")
bstEnv.execute("")
}
}
------------------ 原始邮件 ------------------
发件人: "Leonard Xu"<xbjtdcq@gmail.com>;
发送时间: 2020年6月11日(星期四) 下午2:40
收件人: "user-zh"<user-zh@flink.apache.org>;
主题: Re: BLinkPlanner sql join状态清理
Hi,
可以稳定复现吗?麻烦贴下flink版本和你的case, 我可以帮忙跟进确认下
Best,
Leonard Xu
> 在 2020年6月11日,14:30,op <520075694@qq.com> 写道:
>
> 大家好,最近发现一个问题
> 发现同一套代码,用oldPlanner设置IdleStateRetentionTime后join两边表的状态可以被定时清理,但是用blinkplanner同样的代码,运行却发现状态不会被清理,这是个bug吗?
Re: Re: 回复: BLinkPlanner sql join状态清理
Posted by 刘大龙 <ld...@zju.edu.cn>.
Hi, MiniBatch Agg目前没有实现State TTl,我提了个PR修复这个问题,参考https://github.com/apache/flink/pull/11830
@Jark,辛苦有空时帮忙reveiw一下代码,这个问题越来越多用户用户遇到了。
> -----原始邮件-----
> 发件人: "刘建刚" <li...@gmail.com>
> 发送时间: 2020-09-29 18:27:47 (星期二)
> 收件人: user-zh <us...@flink.apache.org>
> 抄送:
> 主题: Re: 回复: BLinkPlanner sql join状态清理
>
> miniBatch下是无法ttl的,这个是修复方案:https://github.com/apache/flink/pull/11830
>
> Benchao Li <li...@apache.org> 于2020年9月29日周二 下午5:18写道:
>
> > Hi Ericliuk,
> >
> > 这应该是实现的bug,你可以去社区建一个issue描述下这个问题。
> > 有时间的话也可以帮忙修复一下,没有时间社区也会有其他小伙伴帮忙来修复的~
> >
> > Ericliuk <er...@gmail.com> 于2020年9月29日周二 下午4:59写道:
> >
> > > 我最近也遇到了这个问题,看了下,blink,并且配置minibatch优化后就会不使用IdleStateRetention相关配置了。
> > > <
> > >
> > http://apache-flink.147419.n8.nabble.com/file/t491/Xnip2020-09-29_16-55-32.png
> > >
> > >
> > >
> > > 不太清楚为什么用了mini batch就没读取这个配置。
> > > 一个属于状态清理,一个属于agg优化,优化配置要影响到原来的状态query状态清理么?
> > >
> > >
> > >
> > > --
> > > Sent from: http://apache-flink.147419.n8.nabble.com/
> >
> >
> >
> > --
> >
> > Best,
> > Benchao Li
> >
------------------------------
Best
Re: 回复: BLinkPlanner sql join状态清理
Posted by 刘建刚 <li...@gmail.com>.
miniBatch下是无法ttl的,这个是修复方案:https://github.com/apache/flink/pull/11830
Benchao Li <li...@apache.org> 于2020年9月29日周二 下午5:18写道:
> Hi Ericliuk,
>
> 这应该是实现的bug,你可以去社区建一个issue描述下这个问题。
> 有时间的话也可以帮忙修复一下,没有时间社区也会有其他小伙伴帮忙来修复的~
>
> Ericliuk <er...@gmail.com> 于2020年9月29日周二 下午4:59写道:
>
> > 我最近也遇到了这个问题,看了下,blink,并且配置minibatch优化后就会不使用IdleStateRetention相关配置了。
> > <
> >
> http://apache-flink.147419.n8.nabble.com/file/t491/Xnip2020-09-29_16-55-32.png
> >
> >
> >
> > 不太清楚为什么用了mini batch就没读取这个配置。
> > 一个属于状态清理,一个属于agg优化,优化配置要影响到原来的状态query状态清理么?
> >
> >
> >
> > --
> > Sent from: http://apache-flink.147419.n8.nabble.com/
>
>
>
> --
>
> Best,
> Benchao Li
>
Re: 回复: BLinkPlanner sql join状态清理
Posted by Benchao Li <li...@apache.org>.
Hi Ericliuk,
这应该是实现的bug,你可以去社区建一个issue描述下这个问题。
有时间的话也可以帮忙修复一下,没有时间社区也会有其他小伙伴帮忙来修复的~
Ericliuk <er...@gmail.com> 于2020年9月29日周二 下午4:59写道:
> 我最近也遇到了这个问题,看了下,blink,并且配置minibatch优化后就会不使用IdleStateRetention相关配置了。
> <
> http://apache-flink.147419.n8.nabble.com/file/t491/Xnip2020-09-29_16-55-32.png>
>
>
> 不太清楚为什么用了mini batch就没读取这个配置。
> 一个属于状态清理,一个属于agg优化,优化配置要影响到原来的状态query状态清理么?
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
--
Best,
Benchao Li
Re: 回复: BLinkPlanner sql join状态清理
Posted by Ericliuk <er...@gmail.com>.
我最近也遇到了这个问题,看了下,blink,并且配置minibatch优化后就会不使用IdleStateRetention相关配置了。
<http://apache-flink.147419.n8.nabble.com/file/t491/Xnip2020-09-29_16-55-32.png>
不太清楚为什么用了mini batch就没读取这个配置。
一个属于状态清理,一个属于agg优化,优化配置要影响到原来的状态query状态清理么?
--
Sent from: http://apache-flink.147419.n8.nabble.com/