You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Elias Levy (JIRA)" <ji...@apache.org> on 2016/09/01 17:11:20 UTC

[jira] [Commented] (FLINK-3947) Provide low level access to RocksDB state backend

    [ https://issues.apache.org/jira/browse/FLINK-3947?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15456040#comment-15456040 ] 

Elias Levy commented on FLINK-3947:
-----------------------------------

The use case us maintaining a large set of items as state for a keyed stream where items in the set may be added or removed incrementally as new messages are processed.  

There are a number of problems with the current RocksDB state backend implementation:

- It can handle aggregate state in a keyed stream that won't fit in memory as long as per key state fits in memory, but it can't handle per key state that won't fit in memory.

- Even when per key state fits in memory, it is extremely inefficient handling state that consists of a large set or list of items, where a small number items are added/removed to the set/list each time a message is processed.  The current implementation serializes the whole set/list as a single value, and thus deserializes/serializes it all every time a single value is added/removed.

An example of such state is a LRU cache for values extracted from a stream, where during each processElement invocation elements may be added and removed from the keyed state.

With low level access to RocksDB such an LRU could be easily implemented.  Set items would be serialized one per KV in RocksDB.  The key starting with the stream partition key, but also encoding the values timestamp.  Expiration is just a matter of iterating over the key range, deleting expired keys, and stopping after observing a value that should not be expired.  Alternatively, if the RocksDB TTL functionality were exposed, that could be used.

Even if low level access to RocksDB is not provided, Flink needs a other implementations of state management for container data types, where the data is stored and accessed more efficiently that serializing/deserializing the whole data structure.






> Provide low level access to RocksDB state backend
> -------------------------------------------------
>
>                 Key: FLINK-3947
>                 URL: https://issues.apache.org/jira/browse/FLINK-3947
>             Project: Flink
>          Issue Type: Improvement
>          Components: State Backends, Checkpointing
>    Affects Versions: 1.0.3
>            Reporter: Elias Levy
>
> The current state API is limiting and some implementations are not as efficient as they could be, particularly when working with large states. For instance, a ListState is append only.  You cannot remove values from the list.  And the RocksDBListState get() implementation reads all list values from RocksDB instead of returning an Iterable that only reads values as needed.
> Furthermore, RocksDB is an ordered KV store, yet there is no ordered map state API with an ability to iterate over the stored values in order.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)