You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by 李航飞 <te...@163.com> on 2021/08/26 11:02:33 UTC

table.exec.state.ttl

Configuration conf = new Configuration();
conf.setString("table.exec.mini-batch.enabled","true");
conf.setString("table.exec.mini-batch.allow-latency","15s");
conf.setString("table.exec.mini-batch.size","50");
conf.setString("table.exec.state.ttl","15 s");StreamExecutionEnvironment execEnv = StreamExecutionEnvironment.getExecutionEnvironment(); execEnv.configure(conf,this.getClass().getClassLoader()); EnvironmentSetting setting = ... StreamTableEnvironment.create(execEnv,setting); 基于flink1.13.2
微批配置信息如上,kafka流数据,我感觉我这个设置根本没有产生效果
上次那个 allow 也就算了,这次这个 table.exec.state.ttl 设置了15秒,我等了1分钟,再次输入数据,计算结果还是累加了,没有重0开始计算。
程序是通过StatementSet .execute()执行的

Re: table.exec.state.ttl

Posted by Yun Tang <my...@live.com>.
Hi 航飞

可以参照[1] 看是不是类似的问题


[1] https://issues.apache.org/jira/browse/FLINK-23721

祝好
唐云
________________________________
From: 李航飞 <te...@163.com>
Sent: Thursday, August 26, 2021 19:02
To: user-zh <us...@flink.apache.org>
Subject: table.exec.state.ttl

Configuration conf = new Configuration();
conf.setString("table.exec.mini-batch.enabled","true");
conf.setString("table.exec.mini-batch.allow-latency","15s");
conf.setString("table.exec.mini-batch.size","50");
conf.setString("table.exec.state.ttl","15 s");StreamExecutionEnvironment execEnv = StreamExecutionEnvironment.getExecutionEnvironment(); execEnv.configure(conf,this.getClass().getClassLoader()); EnvironmentSetting setting = ... StreamTableEnvironment.create(execEnv,setting); 基于flink1.13.2
微批配置信息如上,kafka流数据,我感觉我这个设置根本没有产生效果
上次那个 allow 也就算了,这次这个 table.exec.state.ttl 设置了15秒,我等了1分钟,再次输入数据,计算结果还是累加了,没有重0开始计算。
程序是通过StatementSet .execute()执行的

Re:Re: table.exec.state.ttl

Posted by 李航飞 <te...@163.com>.
你好:

我现在想在 execution environment 里面设置微批和stateValue的过期时间该怎么设?

这样 conf.setString("exec.state.ttl","15 s");
或者这样  conf.setString("stream.exec.state.ttl","15 s");











在 2021-08-26 19:05:07,"Caizhi Weng" <ts...@gmail.com> 写道:
>Hi!
>
>table 层的配置是加在 table environment 的 table config 里的,加在 execution environment
>里无效。
>
>李航飞 <te...@163.com> 于2021年8月26日周四 下午7:02写道:
>
>> Configuration conf = new Configuration();
>> conf.setString("table.exec.mini-batch.enabled","true");
>> conf.setString("table.exec.mini-batch.allow-latency","15s");
>> conf.setString("table.exec.mini-batch.size","50");
>> conf.setString("table.exec.state.ttl","15 s");StreamExecutionEnvironment
>> execEnv = StreamExecutionEnvironment.getExecutionEnvironment();
>> execEnv.configure(conf,this.getClass().getClassLoader());
>> EnvironmentSetting setting = ...
>> StreamTableEnvironment.create(execEnv,setting); 基于flink1.13.2
>> 微批配置信息如上,kafka流数据,我感觉我这个设置根本没有产生效果
>> 上次那个 allow 也就算了,这次这个 table.exec.state.ttl
>> 设置了15秒,我等了1分钟,再次输入数据,计算结果还是累加了,没有重0开始计算。
>> 程序是通过StatementSet .execute()执行的

Re: table.exec.state.ttl

Posted by Caizhi Weng <ts...@gmail.com>.
Hi!

table 层的配置是加在 table environment 的 table config 里的,加在 execution environment
里无效。

李航飞 <te...@163.com> 于2021年8月26日周四 下午7:02写道:

> Configuration conf = new Configuration();
> conf.setString("table.exec.mini-batch.enabled","true");
> conf.setString("table.exec.mini-batch.allow-latency","15s");
> conf.setString("table.exec.mini-batch.size","50");
> conf.setString("table.exec.state.ttl","15 s");StreamExecutionEnvironment
> execEnv = StreamExecutionEnvironment.getExecutionEnvironment();
> execEnv.configure(conf,this.getClass().getClassLoader());
> EnvironmentSetting setting = ...
> StreamTableEnvironment.create(execEnv,setting); 基于flink1.13.2
> 微批配置信息如上,kafka流数据,我感觉我这个设置根本没有产生效果
> 上次那个 allow 也就算了,这次这个 table.exec.state.ttl
> 设置了15秒,我等了1分钟,再次输入数据,计算结果还是累加了,没有重0开始计算。
> 程序是通过StatementSet .execute()执行的