You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "liangxiaokun (Jira)" <ji...@apache.org> on 2022/06/02 10:09:00 UTC
[jira] [Comment Edited] (FLINK-23721) Flink SQL state TTL has no effect when using non-incremental RocksDBStateBackend
[ https://issues.apache.org/jira/browse/FLINK-23721?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17545393#comment-17545393 ]
liangxiaokun edited comment on FLINK-23721 at 6/2/22 10:08 AM:
---------------------------------------------------------------
Hello! I have the same question with [~lmagics]
In my main class,I used two StateTtlConfig,one is in datastream api which used mapstate.This is the code
{code:java}
MapStateDescriptor<String,String> firstItemState = new MapStateDescriptor<String,String>("firstState", String.class,String.class);
StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Time.minutes(10))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.build();
firstItemState.enableTimeToLive(ttlConfig);
mapState = getRuntimeContext().getMapState(firstItemState); {code}
another one is in sql api ,
{code:java}
StreamTableEnvironment streamTableEnvironment = StreamTableEnvironment.create(env, EnvironmentSettings.fromConfiguration(conf));
//ttl
TableConfig config = streamTableEnvironment.getConfig();
config.setIdleStateRetention(Duration.ofSeconds(10));{code}
I used RocksDBStateBackend
{code:java}
env.enableCheckpointing(4 * 60000, CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
env.getCheckpointConfig().setCheckpointTimeout(20 * 60000);
env.disableOperatorChaining();
env.setStateBackend(new RocksDBStateBackend("hdfs:///user/ucloud_bi/lxk/flink_state/order_detail_realtime"))
{code}
But I found it seems didnt work,because in flink web ui,managed memory is full used
!image-2022-06-02-17-59-38-542.png!
was (Author: JIRAUSER289974):
Hello! I have the same question with [~lmagics]
In my main class,I used two StateTtlConfig,one is in datastream api which used mapstate.This is the code
{code:java}
MapStateDescriptor<String,String> firstItemState = new MapStateDescriptor<String,String>("firstState", String.class,String.class);
StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Time.minutes(10))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.build();
firstItemState.enableTimeToLive(ttlConfig);
mapState = getRuntimeContext().getMapState(firstItemState); {code}
another one is in sql api ,
{code:java}
StreamTableEnvironment streamTableEnvironment = StreamTableEnvironment.create(env, EnvironmentSettings.fromConfiguration(conf));
//ttl
TableConfig config = streamTableEnvironment.getConfig();
config.setIdleStateRetention(Duration.ofSeconds(10));{code}
I used RocksDBStateBackend
{code:java}
env.enableCheckpointing(4 * 60000, CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
env.getCheckpointConfig().setCheckpointTimeout(20 * 60000);
env.disableOperatorChaining();
env.setStateBackend(new FsStateBackend("hdfs:///user/flink_state/order_detail_realtime"));
{code}
But I found it seems didnt work,because in flink web ui,managed memory is full used
!image-2022-06-02-17-59-38-542.png!
> Flink SQL state TTL has no effect when using non-incremental RocksDBStateBackend
> --------------------------------------------------------------------------------
>
> Key: FLINK-23721
> URL: https://issues.apache.org/jira/browse/FLINK-23721
> Project: Flink
> Issue Type: Bug
> Components: Runtime / State Backends, Table SQL / Runtime
> Affects Versions: 1.13.0
> Reporter: Q Kang
> Priority: Major
> Attachments: image-2022-06-02-17-59-38-542.png
>
>
> Take the following deduplication SQL program as an example:
> {code:java}
> SET table.exec.state.ttl=30s;
> INSERT INTO tmp.blackhole_order_done_log
> SELECT t.* FROM (
> SELECT *,ROW_NUMBER() OVER(PARTITION BY suborderid ORDER BY procTime ASC) AS rn
> FROM rtdw_ods.kafka_order_done_log
> ) AS t WHERE rn = 1;
> {code}
> When using RocksDBStateBackend with incremental checkpoint enabled, the size of deduplication state seems OK.
> FlinkCompactionFilter is also working, regarding to logs below:
> {code:java}
> 21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter [] - RocksDB filter native code log: Call FlinkCompactionFilter::FilterV2 - Key: , Data: 0000017B3481026D01, Value type: 0, State type: 1, TTL: 30000 ms, timestamp_offset: 0
> 21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter [] - RocksDB filter native code log: Last access timestamp: 1628673475181 ms, ttlWithoutOverflow: 30000 ms, Current timestamp: 1628673701905 ms
> 21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter [] - RocksDB filter native code log: Decision: 1
> 21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter [] - RocksDB filter native code log: Call FlinkCompactionFilter::FilterV2 - Key: , Data: 0000017B3484064901, Value type: 0, State type: 1, TTL: 30000 ms, timestamp_offset: 0
> 21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter [] - RocksDB filter native code log: Last access timestamp: 1628673672777 ms, ttlWithoutOverflow: 30000 ms, Current timestamp: 1628673701905 ms
> 21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter [] - RocksDB filter native code log: Decision: 0
> 21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter [] - RocksDB filter native code log: Call FlinkCompactionFilter::FilterV2 - Key: , Data: 0000017B3483341D01, Value type: 0, State type: 1, TTL: 30000 ms, timestamp_offset: 0
> 21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter [] - RocksDB filter native code log: Last access timestamp: 1628673618973 ms, ttlWithoutOverflow: 30000 ms, Current timestamp: 1628673701905 ms
> 21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter [] - RocksDB filter native code log: Decision: 1
> {code}
> However, after turning off incremental checkpoint, the state TTL seems not effective at all: FlinkCompactionFilter logs are not printed, and the size of deduplication state grows steadily up to several GBs (Kafka traffic is somewhat heavy, at about 1K records per sec).
> In contrast, FsStateBackend always works well.
>
--
This message was sent by Atlassian Jira
(v8.20.7#820007)