You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2016/08/02 18:04:20 UTC

[jira] [Commented] (FLINK-3779) Add support for queryable state

    [ https://issues.apache.org/jira/browse/FLINK-3779?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15404491#comment-15404491 ] 

ASF GitHub Bot commented on FLINK-3779:
---------------------------------------

Github user rehevkor5 commented on the issue:

    https://github.com/apache/flink/pull/2051
  
    Hi, it's great to see that someone is working on this stuff!
    
    I just wanted to put in my two cents, to provide a different perspective that might change how you are thinking about this.
    
    On my project, we are interested in incorporating pre-computed historical time-series data into the values within a time window. Those values would need to be loaded from a distributed database such as Cassandra or DynamoDB. Also, we would like for newly computed time-series data points (produced by a Flink window pane) to be persisted externally, side-by-side with the historical data (in Cassandra/DynamoDB).
    
    In contrast with your approach, which enables querying of state from within Flink, we are more interested in querying that state from the external database. This allows the Flink job to produce time series data which can be queried ad-hoc in the database, while also allowing the Flink job to produce pre-calculated aggregates from that time series.
    
    I believe others have mentioned in this thread the need, therefore, to allow the State Store to choose the serialization approach. While serializing to byte[] works well for Memory and RocksDB State Stores, inserting into a NoSQL database requires creation of an INSERT command with data that includes primary/partition key, secondary/range key, and arbitrarily structured data (one column of byte[], or perhaps more complex based on the particular type of value). In particular, we need the timestamp of the time series point to be a top-level value in the INSERT, so that time range queries can be efficient. The interface is also important when it comes to Flink loading pre-existing data, because Flink or an integration layer will need to know how to query for the particular keys it is looking for.
    
    I hope that makes sense & gives some perspective on what some people are thinking about with regard to "queryable state".


> Add support for queryable state
> -------------------------------
>
>                 Key: FLINK-3779
>                 URL: https://issues.apache.org/jira/browse/FLINK-3779
>             Project: Flink
>          Issue Type: Improvement
>          Components: Distributed Coordination
>            Reporter: Ufuk Celebi
>            Assignee: Ufuk Celebi
>
> Flink offers state abstractions for user functions in order to guarantee fault-tolerant processing of streams. Users can work with both non-partitioned (Checkpointed interface) and partitioned state (getRuntimeContext().getState(ValueStateDescriptor) and other variants).
> The partitioned state interface provides access to different types of state that are all scoped to the key of the current input element. This type of state can only be used on a KeyedStream, which is created via stream.keyBy().
> Currently, all of this state is internal to Flink and used in order to provide processing guarantees in failure cases (e.g. exactly-once processing).
> The goal of Queryable State is to expose this state outside of Flink by supporting queries against the partitioned key value state.
> This will help to eliminate the need for distributed operations/transactions with external systems such as key-value stores which are often the bottleneck in practice. Exposing the local state to the outside moves a good part of the database work into the stream processor, allowing both high throughput queries and immediate access to the computed state.
> This is the initial design doc for the feature: https://docs.google.com/document/d/1NkQuhIKYmcprIU5Vjp04db1HgmYSsZtCMxgDi_iTN-g. Feel free to comment.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)