You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "liangxiaokun (Jira)" <ji...@apache.org> on 2022/06/02 10:09:00 UTC

[jira] [Comment Edited] (FLINK-23721) Flink SQL state TTL has no effect when using non-incremental RocksDBStateBackend

    [ https://issues.apache.org/jira/browse/FLINK-23721?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17545393#comment-17545393 ] 

liangxiaokun edited comment on FLINK-23721 at 6/2/22 10:08 AM:
---------------------------------------------------------------

Hello! I have the same question with [~lmagics]
In my main class,I used two StateTtlConfig,one is  in datastream api which used mapstate.This is the code 

 
{code:java}
MapStateDescriptor<String,String> firstItemState = new MapStateDescriptor<String,String>("firstState", String.class,String.class);
StateTtlConfig ttlConfig = StateTtlConfig
        .newBuilder(Time.minutes(10))
        .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
        .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
        .build();
firstItemState.enableTimeToLive(ttlConfig);
mapState = getRuntimeContext().getMapState(firstItemState); {code}
 

another one is  in sql  api ,

 
{code:java}
        StreamTableEnvironment streamTableEnvironment = StreamTableEnvironment.create(env, EnvironmentSettings.fromConfiguration(conf));

        //ttl
       TableConfig config = streamTableEnvironment.getConfig();
      config.setIdleStateRetention(Duration.ofSeconds(10));{code}
 

 

I used RocksDBStateBackend 

 
{code:java}
        env.enableCheckpointing(4 * 60000, CheckpointingMode.EXACTLY_ONCE);
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
        env.getCheckpointConfig().setCheckpointTimeout(20 * 60000);
        env.disableOperatorChaining();      
        env.setStateBackend(new RocksDBStateBackend("hdfs:///user/ucloud_bi/lxk/flink_state/order_detail_realtime"))
{code}
But I found it seems didnt work,because in flink web ui,managed memory is full used

 

!image-2022-06-02-17-59-38-542.png!

 


was (Author: JIRAUSER289974):
Hello! I have the same question with [~lmagics]
In my main class,I used two StateTtlConfig,one is  in datastream api which used mapstate.This is the code 

 
{code:java}
MapStateDescriptor<String,String> firstItemState = new MapStateDescriptor<String,String>("firstState", String.class,String.class);
StateTtlConfig ttlConfig = StateTtlConfig
        .newBuilder(Time.minutes(10))
        .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
        .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
        .build();
firstItemState.enableTimeToLive(ttlConfig);
mapState = getRuntimeContext().getMapState(firstItemState); {code}
 

another one is  in sql  api ,

 
{code:java}
        StreamTableEnvironment streamTableEnvironment = StreamTableEnvironment.create(env, EnvironmentSettings.fromConfiguration(conf));

        //ttl
       TableConfig config = streamTableEnvironment.getConfig();
      config.setIdleStateRetention(Duration.ofSeconds(10));{code}
 

 

I used RocksDBStateBackend 

 
{code:java}
        env.enableCheckpointing(4 * 60000, CheckpointingMode.EXACTLY_ONCE);
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
        env.getCheckpointConfig().setCheckpointTimeout(20 * 60000);
        env.disableOperatorChaining();
        env.setStateBackend(new FsStateBackend("hdfs:///user/flink_state/order_detail_realtime"));
{code}
But I found it seems didnt work,because in flink web ui,managed memory is full used

 

!image-2022-06-02-17-59-38-542.png!

 

> Flink SQL state TTL has no effect when using non-incremental RocksDBStateBackend
> --------------------------------------------------------------------------------
>
>                 Key: FLINK-23721
>                 URL: https://issues.apache.org/jira/browse/FLINK-23721
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / State Backends, Table SQL / Runtime
>    Affects Versions: 1.13.0
>            Reporter: Q Kang
>            Priority: Major
>         Attachments: image-2022-06-02-17-59-38-542.png
>
>
> Take the following deduplication SQL program as an example:
> {code:java}
> SET table.exec.state.ttl=30s;
> INSERT INTO tmp.blackhole_order_done_log
> SELECT t.* FROM (
>   SELECT *,ROW_NUMBER() OVER(PARTITION BY suborderid ORDER BY procTime ASC) AS rn
>   FROM rtdw_ods.kafka_order_done_log
> ) AS t WHERE rn = 1;
> {code}
> When using RocksDBStateBackend with incremental checkpoint enabled, the size of deduplication state seems OK.
> FlinkCompactionFilter is also working, regarding to logs below:
> {code:java}
> 21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter                            [] - RocksDB filter native code log: Call FlinkCompactionFilter::FilterV2 - Key: , Data: 0000017B3481026D01, Value type: 0, State type: 1, TTL: 30000 ms, timestamp_offset: 0
> 21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter                            [] - RocksDB filter native code log: Last access timestamp: 1628673475181 ms, ttlWithoutOverflow: 30000 ms, Current timestamp: 1628673701905 ms
> 21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter                            [] - RocksDB filter native code log: Decision: 1
> 21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter                            [] - RocksDB filter native code log: Call FlinkCompactionFilter::FilterV2 - Key: , Data: 0000017B3484064901, Value type: 0, State type: 1, TTL: 30000 ms, timestamp_offset: 0
> 21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter                            [] - RocksDB filter native code log: Last access timestamp: 1628673672777 ms, ttlWithoutOverflow: 30000 ms, Current timestamp: 1628673701905 ms
> 21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter                            [] - RocksDB filter native code log: Decision: 0
> 21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter                            [] - RocksDB filter native code log: Call FlinkCompactionFilter::FilterV2 - Key: , Data: 0000017B3483341D01, Value type: 0, State type: 1, TTL: 30000 ms, timestamp_offset: 0
> 21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter                            [] - RocksDB filter native code log: Last access timestamp: 1628673618973 ms, ttlWithoutOverflow: 30000 ms, Current timestamp: 1628673701905 ms
> 21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter                            [] - RocksDB filter native code log: Decision: 1
> {code}
> However, after turning off incremental checkpoint, the state TTL seems not effective at all: FlinkCompactionFilter logs are not printed, and the size of deduplication state grows steadily up to several GBs (Kafka traffic is somewhat heavy, at about 1K records per sec).
> In contrast, FsStateBackend always works well.
>  



--
This message was sent by Atlassian Jira
(v8.20.7#820007)