You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by 李诗君 <li...@126.com> on 2022/05/23 12:32:33 UTC

flink sql api, exception when setting "table.exec.state.ttl"

flink version: 1.13.5


java code:


StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

EnvironmentSettings settings = EnvironmentSettings.newInstance()
                .useBlinkPlanner()
                .inStreamingMode()
                .build();

StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);

env.enableCheckpointing(60000);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(60000);
env.getCheckpointConfig().setCheckpointTimeout(60000);
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(2);
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
env.getCheckpointConfig().enableExternalizedCheckpoints(
CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
//        env.setStateBackend(new EmbeddedRocksDBStateBackend(true));
//        env.getCheckpointConfig().setCheckpointStorage("hdfs://test-wh-hadoop-1:9000/flink-checkpoints");

env.setStateBackend(new RocksDBStateBackend("hdfs://test-wh-hadoop-1:9000/flink-checkpoints",
                true));

tableEnv.getConfig().setLocalTimeZone(ZoneId.of("Asia/Shanghai"));
Configuration configuration = tableEnv.getConfig().getConfiguration();
//        configuration.setString("table.exec.resource.default-parallelism","16");
configuration.setString("table.exec.state.ttl","7200000");


and when I submit this job , I got this:


Sink: Sink(table=[default_catalog.default_database.rts_board_trans_compute], fields=[mchnt_id, time_hour, channel, trans_count, trans_amount, average_amount]) (1/1) (f8649f8434775cbda10bcedce96c9ae3) switched fromINITIALIZING to FAILEDon container_1647420330066_0473_01_000002 @ test-wh-hadoop-1 (dataPort=38604).
java.lang.UnsatisfiedLinkError: org.rocksdb.FlinkCompactionFilter.createNewFlinkCompactionFilterConfigHolder()J
    at org.rocksdb.FlinkCompactionFilter.createNewFlinkCompactionFilterConfigHolder(NativeMethod) ~[flink-dist_2.12-1.13.5.jar:2.2.0]
    at org.rocksdb.FlinkCompactionFilter.access$000(FlinkCompactionFilter.java:13) ~[flink-dist_2.12-1.13.5.jar:2.2.0]
    at org.rocksdb.FlinkCompactionFilter$ConfigHolder.<init>(FlinkCompactionFilter.java:107) ~[flink-dist_2.12-1.13.5.jar:2.2.0]
    at org.rocksdb.FlinkCompactionFilter$FlinkCompactionFilterFactory.<init>(FlinkCompactionFilter.java:133) ~[flink-dist_2.12-1.13.5.jar:2.2.0]
    at org.apache.flink.contrib.streaming.state.ttl.RocksDbTtlCompactFiltersManager.createAndSetCompactFilterFactory(RocksDbTtlCompactFiltersManager.java:84) ~[flink-dist_2.12-1.13.5.jar:1.13.5]
    at org.apache.flink.contrib.streaming.state.ttl.RocksDbTtlCompactFiltersManager.setAndRegisterCompactFilterIfStateTtl(RocksDbTtlCompactFiltersManager.java:74) ~[flink-dist_2.12-1.13.5.jar:1.13.5]
    at org.apache.flink.contrib.streaming.state.RocksDBOperationUtils.createColumnFamilyDescriptor(RocksDBOperationUtils.java:167) ~[flink-dist_2.12-1.13.5.jar:1.13.5]
    at org.apache.flink.contrib.streaming.state.RocksDBOperationUtils.createStateInfo(RocksDBOperationUtils.java:144) ~[flink-dist_2.12-1.13.5.jar:1.13.5]
    at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:643) ~[flink-dist_2.12-1.13.5.jar:1.13.5]
    at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:837) ~[flink-dist_2.12-1.13.5.jar:1.13.5]
    at org.apache.flink.runtime.state.ttl.TtlStateFactory.createTtlStateContext(TtlStateFactory.java:208) ~[rocketmq-flink-1.0.0-SNAPSHOT.jar:?]
    at org.apache.flink.runtime.state.ttl.TtlStateFactory.createValueState(TtlStateFactory.java:143) ~[rocketmq-flink-1.0.0-SNAPSHOT.jar:?]
    at org.apache.flink.runtime.state.ttl.TtlStateFactory.createState(TtlStateFactory.java:130) ~[rocketmq-flink-1.0.0-SNAPSHOT.jar:?]
    at org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:70) ~[rocketmq-flink-1.0.0-SNAPSHOT.jar:?]
    at org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:301) ~[rocketmq-flink-1.0.0-SNAPSHOT.jar:?]
    at org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:352) ~[rocketmq-flink-1.0.0-SNAPSHOT.jar:?]
    at org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:115) ~[rocketmq-flink-1.0.0-SNAPSHOT.jar:?]
    at org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:60) ~[rocketmq-flink-1.0.0-SNAPSHOT.jar:?]
    at org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:203) ~[flink-dist_2.12-1.13.5.jar:1.13.5]
    at org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.open(GroupAggFunction.java:119) ~[flink-table-blink_2.12-1.13.5.jar:1.13.5]
    at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:34) ~[rocketmq-flink-1.0.0-SNAPSHOT.jar:?]
    at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102) ~[flink-dist_2.12-1.13.5.jar:1.13.5]
    at org.apache.flink.streaming.api.operators.KeyedProcessOperator.open(KeyedProcessOperator.java:55) ~[flink-dist_2.12-1.13.5.jar:1.13.5]
    at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:442) ~[flink-dist_2.12-1.13.5.jar:1.13.5]
    at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:585) ~[flink-dist_2.12-1.13.5.jar:1.13.5]
    at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55) ~[flink-dist_2.12-1.13.5.jar:1.13.5]
    at org.apache.flink.streaming.runtime.tasks.StreamTask.executeRestore(StreamTask.java:565) ~[flink-dist_2.12-1.13.5.jar:1.13.5]
    at org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:650) ~[flink-dist_2.12-1.13.5.jar:1.13.5]
    at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:540) ~[flink-dist_2.12-1.13.5.jar:1.13.5]
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:759) ~[rocketmq-flink-1.0.0-SNAPSHOT.jar:?]
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) ~[rocketmq-flink-1.0.0-SNAPSHOT.jar:?]
    at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_231]




It will be quite normal if I take away this line of code:
configuration.setString("table.exec.state.ttl","7200000");


so ,what’s wrong with this setting? 


any suggestion will be appreciated.










 





 

Re:Re: flink sql api, exception when setting "table.exec.state.ttl"

Posted by 李诗君 <li...@126.com>.


I have feagured this out.
It was because I put a flink-connector-tidb-cdc.jar in my Flink's lib folder earlier, and it is shipped with scala 2.11, while my flink is shipped with scala2.12.
Some how when I submit a job with GroupAggregate operator, it needs to load keyed rocksdb states, and here come into a conflict.
I will look into it and give a solution.










At 2022-05-23 20:55:39, "Chesnay Schepler" <ch...@apache.org> wrote:

You're probably mixing Flink versions.


From the stack trace we can see that Flink classes are being loaded from 2 different jars (rocketmq-flink-1.0.0-SNAPSHOT.jar/flink-dist_2.12-1.13.5.jar); I'd suggest to resolve that first and see if the error persists.



On 23/05/2022 14:32, 李诗君 wrote:

flink version: 1.13.5


java code:


StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        EnvironmentSettings settings = EnvironmentSettings.newInstance()
                .useBlinkPlanner()
                .inStreamingMode()
                .build();

        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);env.enableCheckpointing(60000);env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(60000);
env.getCheckpointConfig().setCheckpointTimeout(60000);
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(2);
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
env.getCheckpointConfig().enableExternalizedCheckpoints(
                CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
//        env.setStateBackend(new EmbeddedRocksDBStateBackend(true));
//        env.getCheckpointConfig().setCheckpointStorage("hdfs://test-wh-hadoop-1:9000/flink-checkpoints");

        env.setStateBackend(new RocksDBStateBackend("hdfs://test-wh-hadoop-1:9000/flink-checkpoints",
                true));

        tableEnv.getConfig().setLocalTimeZone(ZoneId.of("Asia/Shanghai"));
        Configuration configuration = tableEnv.getConfig().getConfiguration();
//        configuration.setString("table.exec.resource.default-parallelism","16");
        configuration.setString("table.exec.state.ttl","7200000");


and when I submit this job , I got this:


Sink: Sink(table=[default_catalog.default_database.rts_board_trans_compute], fields=[mchnt_id, time_hour, channel, trans_count, trans_amount, average_amount]) (1/1) (f8649f8434775cbda10bcedce96c9ae3) switched fromINITIALIZING to FAILEDon container_1647420330066_0473_01_000002 @ test-wh-hadoop-1 (dataPort=38604).
java.lang.UnsatisfiedLinkError: org.rocksdb.FlinkCompactionFilter.createNewFlinkCompactionFilterConfigHolder()J
    at org.rocksdb.FlinkCompactionFilter.createNewFlinkCompactionFilterConfigHolder(NativeMethod) ~[flink-dist_2.12-1.13.5.jar:2.2.0]
    at org.rocksdb.FlinkCompactionFilter.access$000(FlinkCompactionFilter.java:13) ~[flink-dist_2.12-1.13.5.jar:2.2.0]
    at org.rocksdb.FlinkCompactionFilter$ConfigHolder.<init>(FlinkCompactionFilter.java:107) ~[flink-dist_2.12-1.13.5.jar:2.2.0]
    at org.rocksdb.FlinkCompactionFilter$FlinkCompactionFilterFactory.<init>(FlinkCompactionFilter.java:133) ~[flink-dist_2.12-1.13.5.jar:2.2.0]
    at org.apache.flink.contrib.streaming.state.ttl.RocksDbTtlCompactFiltersManager.createAndSetCompactFilterFactory(RocksDbTtlCompactFiltersManager.java:84) ~[flink-dist_2.12-1.13.5.jar:1.13.5]
    at org.apache.flink.contrib.streaming.state.ttl.RocksDbTtlCompactFiltersManager.setAndRegisterCompactFilterIfStateTtl(RocksDbTtlCompactFiltersManager.java:74) ~[flink-dist_2.12-1.13.5.jar:1.13.5]
    at org.apache.flink.contrib.streaming.state.RocksDBOperationUtils.createColumnFamilyDescriptor(RocksDBOperationUtils.java:167) ~[flink-dist_2.12-1.13.5.jar:1.13.5]
    at org.apache.flink.contrib.streaming.state.RocksDBOperationUtils.createStateInfo(RocksDBOperationUtils.java:144) ~[flink-dist_2.12-1.13.5.jar:1.13.5]
    at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:643) ~[flink-dist_2.12-1.13.5.jar:1.13.5]
    at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:837) ~[flink-dist_2.12-1.13.5.jar:1.13.5]
    at org.apache.flink.runtime.state.ttl.TtlStateFactory.createTtlStateContext(TtlStateFactory.java:208) ~[rocketmq-flink-1.0.0-SNAPSHOT.jar:?]
    at org.apache.flink.runtime.state.ttl.TtlStateFactory.createValueState(TtlStateFactory.java:143) ~[rocketmq-flink-1.0.0-SNAPSHOT.jar:?]
    at org.apache.flink.runtime.state.ttl.TtlStateFactory.createState(TtlStateFactory.java:130) ~[rocketmq-flink-1.0.0-SNAPSHOT.jar:?]
    at org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:70) ~[rocketmq-flink-1.0.0-SNAPSHOT.jar:?]
    at org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:301) ~[rocketmq-flink-1.0.0-SNAPSHOT.jar:?]
    at org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:352) ~[rocketmq-flink-1.0.0-SNAPSHOT.jar:?]
    at org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:115) ~[rocketmq-flink-1.0.0-SNAPSHOT.jar:?]
    at org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:60) ~[rocketmq-flink-1.0.0-SNAPSHOT.jar:?]
    at org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:203) ~[flink-dist_2.12-1.13.5.jar:1.13.5]
    at org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.open(GroupAggFunction.java:119) ~[flink-table-blink_2.12-1.13.5.jar:1.13.5]
    at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:34) ~[rocketmq-flink-1.0.0-SNAPSHOT.jar:?]
    at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102) ~[flink-dist_2.12-1.13.5.jar:1.13.5]
    at org.apache.flink.streaming.api.operators.KeyedProcessOperator.open(KeyedProcessOperator.java:55) ~[flink-dist_2.12-1.13.5.jar:1.13.5]
    at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:442) ~[flink-dist_2.12-1.13.5.jar:1.13.5]
    at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:585) ~[flink-dist_2.12-1.13.5.jar:1.13.5]
    at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55) ~[flink-dist_2.12-1.13.5.jar:1.13.5]
    at org.apache.flink.streaming.runtime.tasks.StreamTask.executeRestore(StreamTask.java:565) ~[flink-dist_2.12-1.13.5.jar:1.13.5]
    at org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:650) ~[flink-dist_2.12-1.13.5.jar:1.13.5]
    at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:540) ~[flink-dist_2.12-1.13.5.jar:1.13.5]
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:759) ~[rocketmq-flink-1.0.0-SNAPSHOT.jar:?]
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) ~[rocketmq-flink-1.0.0-SNAPSHOT.jar:?]
    at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_231]




It will be quite normal if I take away this line of code:
configuration.setString("table.exec.state.ttl","7200000");


so ,what’s wrong with this setting? 


any suggestion will be appreciated.










 





 





 



Re: flink sql api, exception when setting "table.exec.state.ttl"

Posted by Chesnay Schepler <ch...@apache.org>.
You're probably mixing Flink versions.

 From the stack trace we can see that Flink classes are being loaded 
from 2 different jars 
(rocketmq-flink-1.0.0-SNAPSHOT.jar/flink-dist_2.12-1.13.5.jar); I'd 
suggest to resolve that first and see if the error persists.

On 23/05/2022 14:32, 李诗君 wrote:
> flink version: 1.13.5
>
> java code:
>
>         StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings settings =EnvironmentSettings.newInstance()
>                  .useBlinkPlanner()
>                  .inStreamingMode()
>                  .build(); StreamTableEnvironmenttableEnv = StreamTableEnvironment.create(env, settings);env.enableCheckpointing(60000);env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); env.getCheckpointConfig().setMinPauseBetweenCheckpoints(60000); env.getCheckpointConfig().setCheckpointTimeout(60000); env.getCheckpointConfig().setTolerableCheckpointFailureNumber(2); env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); env.getCheckpointConfig().enableExternalizedCheckpoints(
>                  CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); // env.setStateBackend(new EmbeddedRocksDBStateBackend(true)); // 
> env.getCheckpointConfig().setCheckpointStorage("hdfs://test-wh-hadoop-1:9000/flink-checkpoints"); 
> env.setStateBackend(new RocksDBStateBackend("hdfs://test-wh-hadoop-1:9000/flink-checkpoints", true)); tableEnv.getConfig().setLocalTimeZone(ZoneId.of("Asia/Shanghai")); Configuration configuration =tableEnv.getConfig().getConfiguration(); // 
> configuration.setString("table.exec.resource.default-parallelism","16"); 
> configuration.setString("table.exec.state.ttl","7200000");
>
> and when I submit this job , I got this:
>
> Sink: 
> Sink(table=[default_catalog.default_database.rts_board_trans_compute], 
> fields=[mchnt_id, time_hour, channel, trans_count, trans_amount, 
> average_amount]) (1/1) (f8649f8434775cbda10bcedce96c9ae3) switched 
> from INITIALIZING to FAILED on container_1647420330066_0473_01_000002 
> @ test-wh-hadoop-1 (dataPort=38604).
> java.lang.UnsatisfiedLinkError: 
> org.rocksdb.FlinkCompactionFilter.createNewFlinkCompactionFilterConfigHolder()J
>     at 
> org.rocksdb.FlinkCompactionFilter.createNewFlinkCompactionFilterConfigHolder(Native 
> Method) ~[flink-dist_2.12-1.13.5.jar:2.2.0]
>     at 
> org.rocksdb.FlinkCompactionFilter.access$000(FlinkCompactionFilter.java:13) 
> ~[flink-dist_2.12-1.13.5.jar:2.2.0]
>     at 
> org.rocksdb.FlinkCompactionFilter$ConfigHolder.<init>(FlinkCompactionFilter.java:107) 
> ~[flink-dist_2.12-1.13.5.jar:2.2.0]
>     at 
> org.rocksdb.FlinkCompactionFilter$FlinkCompactionFilterFactory.<init>(FlinkCompactionFilter.java:133) 
> ~[flink-dist_2.12-1.13.5.jar:2.2.0]
>     at 
> org.apache.flink.contrib.streaming.state.ttl.RocksDbTtlCompactFiltersManager.createAndSetCompactFilterFactory(RocksDbTtlCompactFiltersManager.java:84) 
> ~[flink-dist_2.12-1.13.5.jar:1.13.5]
>     at 
> org.apache.flink.contrib.streaming.state.ttl.RocksDbTtlCompactFiltersManager.setAndRegisterCompactFilterIfStateTtl(RocksDbTtlCompactFiltersManager.java:74) 
> ~[flink-dist_2.12-1.13.5.jar:1.13.5]
>     at 
> org.apache.flink.contrib.streaming.state.RocksDBOperationUtils.createColumnFamilyDescriptor(RocksDBOperationUtils.java:167) 
> ~[flink-dist_2.12-1.13.5.jar:1.13.5]
>     at 
> org.apache.flink.contrib.streaming.state.RocksDBOperationUtils.createStateInfo(RocksDBOperationUtils.java:144) 
> ~[flink-dist_2.12-1.13.5.jar:1.13.5]
>     at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:643) 
> ~[flink-dist_2.12-1.13.5.jar:1.13.5]
>     at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:837) 
> ~[flink-dist_2.12-1.13.5.jar:1.13.5]
>     at 
> org.apache.flink.runtime.state.ttl.TtlStateFactory.createTtlStateContext(TtlStateFactory.java:208) 
> ~[rocketmq-flink-1.0.0-SNAPSHOT.jar:?]
>     at 
> org.apache.flink.runtime.state.ttl.TtlStateFactory.createValueState(TtlStateFactory.java:143) 
> ~[rocketmq-flink-1.0.0-SNAPSHOT.jar:?]
>     at 
> org.apache.flink.runtime.state.ttl.TtlStateFactory.createState(TtlStateFactory.java:130) 
> ~[rocketmq-flink-1.0.0-SNAPSHOT.jar:?]
>     at 
> org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:70) 
> ~[rocketmq-flink-1.0.0-SNAPSHOT.jar:?]
>     at 
> org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:301) 
> ~[rocketmq-flink-1.0.0-SNAPSHOT.jar:?]
>     at 
> org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:352) 
> ~[rocketmq-flink-1.0.0-SNAPSHOT.jar:?]
>     at 
> org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:115) 
> ~[rocketmq-flink-1.0.0-SNAPSHOT.jar:?]
>     at 
> org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:60) 
> ~[rocketmq-flink-1.0.0-SNAPSHOT.jar:?]
>     at 
> org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:203) 
> ~[flink-dist_2.12-1.13.5.jar:1.13.5]
>     at 
> org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.open(GroupAggFunction.java:119) 
> ~[flink-table-blink_2.12-1.13.5.jar:1.13.5]
>     at 
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:34) 
> ~[rocketmq-flink-1.0.0-SNAPSHOT.jar:?]
>     at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102) 
> ~[flink-dist_2.12-1.13.5.jar:1.13.5]
>     at 
> org.apache.flink.streaming.api.operators.KeyedProcessOperator.open(KeyedProcessOperator.java:55) 
> ~[flink-dist_2.12-1.13.5.jar:1.13.5]
>     at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:442) 
> ~[flink-dist_2.12-1.13.5.jar:1.13.5]
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:585) 
> ~[flink-dist_2.12-1.13.5.jar:1.13.5]
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55) 
> ~[flink-dist_2.12-1.13.5.jar:1.13.5]
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.executeRestore(StreamTask.java:565) 
> ~[flink-dist_2.12-1.13.5.jar:1.13.5]
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:650) 
> ~[flink-dist_2.12-1.13.5.jar:1.13.5]
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:540) 
> ~[flink-dist_2.12-1.13.5.jar:1.13.5]
>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:759) 
> ~[rocketmq-flink-1.0.0-SNAPSHOT.jar:?]
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) 
> ~[rocketmq-flink-1.0.0-SNAPSHOT.jar:?]
>     at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_231]
>
>
> It will be quite normal if I take away this line of code:
> configuration.setString("table.exec.state.ttl","7200000");
>
> so ,what’s wrong with this setting?
>
> any suggestion will be appreciated.
>
>
>
>
>
>
>
>
>