You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2018/11/22 19:26:12 UTC

[GitHub] azagrebin opened a new pull request #7163: [FLINK-10471][State TTL] State TTL cleanup using RocksDb compaction filter

azagrebin opened a new pull request #7163: [FLINK-10471][State TTL] State TTL cleanup using RocksDb compaction filter
URL: https://github.com/apache/flink/pull/7163
 
 
   ## What is the purpose of the change
   
   This PR introduces a Flink specific RocksDb compaction filter to clean up expired state with TTL.
   RocksDB runs periodic compaction of state updates and merges them to free storage.
   During this process, the TTL filter checks timestamp of state entries and drops expired ones.
   
   The feature has to be activated in RocksDb backend firstly and then in TTL config in each state descriptor.
   The reason is that the filter has to be set in RockDB options at the moment of database creation
   when backend is initialized or restored before running the task.
   User state can be basically defined with descriptor and created any time later when the task is alredy running.
   
   If the filter is configured for backend but not for some state, it will be still called.
   The reason is that the filter has to be always created and set when the backend restores 
   because we do not know whether user decides to activate it or not later.
   The called filter will do nothing in this case to minimize performance impact.
   
   The compaction filter becomes part of state info in RocksDb backend along with column handle
   because it is tied to native object in RocksDb code and has to be released while disposing backend.
   
   The filter itself works by its own for all states except list.
   The filter has to call Flink code over JNI API to deserialise list elements with non-fixed lenth
   and to define position of the next element in the list to check.
   Additionally, the current timestamp can be set in filter by Flink for testing 
   and future extention with event time.
   
   You can activate debug logs from the native code of RocksDB filter 
   by activating debug level for `FlinkCompactionFilter`, e.g. in tests:
   `log4j.logger.org.rocksdb.FlinkCompactionFilter=DEBUG, A1`
   
   Note: The source code of TTL filter itself resides on RocksDb library side.
   It means that this PR is based on manually built RocksDB version from the following PR:
   https://github.com/facebook/rocksdb/pull/4463
   This RocksDB version is not available for CI and causes it to fail at the moment.
   
   ## Brief change log
   
    - add enableTtlCompactionFilter option for RocksDBStateBackend
    - add feature configuration by StateTtlConfig.Builder.cleanupInRocksdbCompactFilter()
    - introduce RocksDbKvStateInfo in RocksDBKeyedStateBackend to keep track of filters along with column handles
    - add RocksDbTtlCompactFilterUtils to create and configure filter
    - add separate tests for full and incremental snapshoting with TTL
    - add unit test for compaction cleanup
    - test separtely list state with fixed length elements (ints) and non-fixed (strings).
    - add user docs
   
   ## Verifying this change
   
   It should be covered by unit tests coming in PR.
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): (yes: rocksdbjni)
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes)
     - The serializers: (no)
     - The runtime per-record code paths (performance sensitive): (yes/no: can degrade compaction performance - price to have cleanup)
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
     - The S3 file system connector: (no)
   
   ## Documentation
   
     - Does this pull request introduce a new feature? (yes)
     - If yes, how is the feature documented? (docs / JavaDocs, included in PR)

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services