You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by op <52...@qq.com> on 2020/06/10 08:07:48 UTC

Flink sql 状态清理问题

hi,
写了个测试程序:


......
val tConfig = bstEnv.getConfigconfg.withIdleStateRetentionTime(Time.minutes(10),Time.minutes(25))......val q1=bstEnv.sqlQuery(
  """select createTime,feedid from source
    |where circleName is not null
    |and circleName not in('','_')
    |and action = 'C_FEED_EDIT_SEND'
    |""".stripMargin)
 bstEnv.createTemporaryView("sourcefeed",q1)
val q2=bstEnv.sqlQuery(
  """select feedid,postfeedid,action from source
    |where circleName is not null
    |and circleName not in('','_')
    |and action in('C_PUBLISH','C_FORWARD_PUBLISH')
    |""".stripMargin)

bstEnv.createTemporaryView("postfeed",q2)
bstEnv.sqlQuery(
  """
    |select count(b.postfeedid) from
    |sourcefeed a
    |join postfeed b
    |on a.feedid=b.postfeedid
  """.stripMargin).toRetractStream[Row](confg).print("")
//------------------------------------程序里面设置了状态失效最晚时间是空闲25分钟,但是运行了几天了,我再web上观察到的状态一直再不断增加,可以确定关联的id最多只会活跃1个小时左右,请问是哪里没设置对还是join两边的state不支持清理?

Re: Flink sql 状态清理问题

Posted by Benchao Li <li...@apache.org>.
Hi,

Join算子的state是支持清理的。
可以提供下以下信息:
- Flink 版本
- planner (blink planner / old planner)

op <52...@qq.com> 于2020年6月10日周三 下午4:08写道:

> hi,
> 写了个测试程序:
>
> ......
>
> val tConfig = bstEnv.getConfig
>
> confg.withIdleStateRetentionTime(Time.minutes(10),Time.minutes(25))
>
> ......
>
> val q1=bstEnv.sqlQuery(
>   """select createTime,feedid from source
>     |where circleName is not null
>     |and circleName not in('','_')
>     |and action = 'C_FEED_EDIT_SEND'
>     |""".stripMargin)
>
>
>  bstEnv.createTemporaryView("sourcefeed",q1)
> val q2=bstEnv.sqlQuery(
>   """select feedid,postfeedid,action from source
>     |where circleName is not null
>     |and circleName not in('','_')
>     |and action in('C_PUBLISH','C_FORWARD_PUBLISH')
>     |""".stripMargin)
>
> bstEnv.createTemporaryView("postfeed",q2)
> bstEnv.sqlQuery(
>   """
>     |select count(b.postfeedid) from
>     |sourcefeed a
>     |join postfeed b
>     |on a.feedid=b.postfeedid
>   """.stripMargin).toRetractStream[Row](confg).print("")
>
>
> //------------------------------------
>
> 程序里面设置了状态失效最晚时间是空闲25分钟,但是运行了几天了,我再web上观察到的状态一直再不断增加,可以确定关联的id最多只会活跃1个小时左右,请问是哪里没设置对还是join两边的state不支持清理?
>
>