You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "Q Kang (Jira)" <ji...@apache.org> on 2021/08/11 10:04:00 UTC

[jira] [Created] (FLINK-23721) Flink SQL state TTL has no effect when using non-incremental RocksDBStateBackend

Q Kang created FLINK-23721:
------------------------------

             Summary: Flink SQL state TTL has no effect when using non-incremental RocksDBStateBackend
                 Key: FLINK-23721
                 URL: https://issues.apache.org/jira/browse/FLINK-23721
             Project: Flink
          Issue Type: Bug
          Components: Runtime / State Backends, Table SQL / Runtime
    Affects Versions: 1.13.0
            Reporter: Q Kang


Take the following deduplication SQL program as an example:
{code:java}
SET table.exec.state.ttl=30s;

INSERT INTO tmp.blackhole_order_done_log
SELECT t.* FROM (
  SELECT *,ROW_NUMBER() OVER(PARTITION BY suborderid ORDER BY procTime ASC) AS rn
  FROM rtdw_ods.kafka_order_done_log
) AS t WHERE rn = 1;
{code}
When using RocksDBStateBackend with incremental checkpoint enabled, the size of deduplication state seems OK.

FlinkCompactionFilter is also working, regarding to logs below:
{code:java}
21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter                            [] - RocksDB filter native code log: Call FlinkCompactionFilter::FilterV2 - Key: , Data: 0000017B3481026D01, Value type: 0, State type: 1, TTL: 30000 ms, timestamp_offset: 0
21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter                            [] - RocksDB filter native code log: Last access timestamp: 1628673475181 ms, ttlWithoutOverflow: 30000 ms, Current timestamp: 1628673701905 ms
21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter                            [] - RocksDB filter native code log: Decision: 1
21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter                            [] - RocksDB filter native code log: Call FlinkCompactionFilter::FilterV2 - Key: , Data: 0000017B3484064901, Value type: 0, State type: 1, TTL: 30000 ms, timestamp_offset: 0
21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter                            [] - RocksDB filter native code log: Last access timestamp: 1628673672777 ms, ttlWithoutOverflow: 30000 ms, Current timestamp: 1628673701905 ms
21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter                            [] - RocksDB filter native code log: Decision: 0
21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter                            [] - RocksDB filter native code log: Call FlinkCompactionFilter::FilterV2 - Key: , Data: 0000017B3483341D01, Value type: 0, State type: 1, TTL: 30000 ms, timestamp_offset: 0
21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter                            [] - RocksDB filter native code log: Last access timestamp: 1628673618973 ms, ttlWithoutOverflow: 30000 ms, Current timestamp: 1628673701905 ms
21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter                            [] - RocksDB filter native code log: Decision: 1
{code}
However, after turning off incremental checkpoint, the state TTL seems not effective at all: FlinkCompactionFilter logs are not printed, and the size of deduplication state grows steadily up to several GBs (Kafka traffic is somewhat heavy, at about 1K records per sec).

In contrast, FsStateBackend always works well.

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)