You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by uce <gi...@git.apache.org> on 2016/05/30 16:11:52 UTC

[GitHub] flink pull request: [FLINK-3779] Add support for queryable state

GitHub user uce opened a pull request:

    https://github.com/apache/flink/pull/2051

    [FLINK-3779] Add support for queryable state

    First of all, thanks to @tillrohrmann, @aljoscha, and @StephanEwen for discussions during and before implementing this first version. The initial design document can be found here: https://docs.google.com/document/d/1NkQuhIKYmcprIU5Vjp04db1HgmYSsZtCMxgDi_iTN-g
    
    **In a nutshell, this feature allows users to query Flink's managed partitioned state from outside of Flink. This eliminates the need for distributed operations/transactions with external systems such as key-value stores which are often the bottleneck in practice.**
    
    # APIs
    
    ## QueryableStateStream
    
    The following methods have been added as `@PublicEvolving` to `KeyedStream`:
    
    ```java
    // ValueState
    QueryableStateStream asQueryableState(
        String queryableStateName,
        ValueStateDescriptor stateDescriptor)
    
    // Shortcut for explicit ValueStateDescriptor variant
    QueryableStateStream asQueryableState(String queryableStateName)
    
    // ListState
    QueryableStateStream asQueryableState(
        String queryableStateName,
        ListStateDescriptor stateDescriptor)
    
    // FoldingState
    QueryableStateStream asQueryableState(
        String queryableStateName,
        FoldingStateDescriptor stateDescriptor)
    
    // ReducingState
    QueryableStateStream asQueryableState(
        String queryableStateName,
        ReducingStateDescriptor stateDescriptor)
    ```
    
    A call to these methods returns a `QueryableStateStream`, which cannot be further transformed and currently only holds the value and key serializer for the queryable state stream. It's comparable to a sink, after which you cannot do further transformations.
    
    The `QueryableStateStream` gets translated to an operator, which uses all incoming records to update the queryable state instance. If you have a program like `stream.keyBy(0).asQueryableState("query-name")`, all records of the keyed stream will be used to update the state instance, either via `update` for `ValueState` or `add` for `AppendingState`. This acts like the Scala API's `flatMapWithState`. For an example, take a look at `QueryableStateITCase` in `flink-tests`.
    
    I understand that these are quite a few methods to add to the public APIs, but I am not aware of another way to do it if we want to ensure type safety when providing the state descriptor. @aljoscha, you have quite some experience with designing APIs. Is there maybe a better way? And what do you (and others) think about the name `QueryableStateStream`? We could also go for the shorter `QueryableState` or something else even. I'm open to suggestions.
    
    ## QueryableStateClient
    
    This is the client used for queries against the KvState instances. The query method is this:
    
    ```java
    Future<byte[]> getKvState(
        JobID jobID,
        String queryableStateName,
        int keyHashCode,
        byte[] serializedKeyAndNamespace)
    ```
    
    A call to the method returns a Future eventually holding the serialized state value for the queryable state instance identified by `queryableStateName` of the job with ID `jobID`. The `keyHashCode` is the hash code as returned by `Object.hashCode()` and the `serializedKeyAndNamespace` is the serialized key and namespace. The client is asynchronous and can be shared by multiple Threads. An example can be seen in `QueryableStateITCase` (in `flink-tests`).
    
    The current implementation is low-level in the sense that it only works with serialized data both for providing the key/namespace and the returned results. It's the responsibility of the user (or some follow-up utilities) to set up the serializers for this. The nice thing about this is that the query services don't have to get into the business of worrying about any class loading issues etc.
    
    There are some serialization utils for key/namespace and value serialization included in `KvStateRequestSerializer`.
    
    # Implementation
    
    The following sections highlight the main changes/additions.
    
    ## Added `setQueryable(String)` to `StateDescriptor`
    
    KvState instances are published for queries when they have a queryable state name set (see below). For this purpose, I've introduced the `setQueryable(String)` method to the `StateDescriptor` interface. The provided name is different from the state descriptor name we already had before. For queries, only the name provided in `setQueryable(String)` is relevant.
    
    The name needs to be unique per job. If this is not the case, the job fails at runtime with an unrecoverable exception. Unfortunately, this can not be checked before submitting the job.
    
    ## Added `byte[] getSerializedValue(byte[] serializedKeyNamespace)` to `KvState`
    
    This method is implemented by all KvState instances for queries. Since all state instances have references to their serializers, they have to worry about serialization and the caller does not.
    
    For Java heap-backed state, we deserialize the key and namespace, access the state for the key/namespace, and serialize the result. For RocksDB backed state, we can directly use the `serializedKeyAndNamespace` to access the serialized result.
    
    Furthermore, with the RocksDB state backend we don't have to worry about concurrent accesses to the state instance whereas we need `ConcurrentHashMap`s for the internal key/namespace maps of `AbstractHeapState` if the state instance is queryable.
    
    ## Added `KvStateRegistry` to TaskManager
    
    This is a very simple registry on the TaskManager. The `AbstractStateBackend` registers `KvState` instances at runtime on:
    - first call to `getPartitionedState()`, which creates the `State` instance, or
    - `injectKeyValueStateSnapshots()`.
    
    At the moment, we essentially have two variants for the state backends: either a RocksDB backed or a Java heap-backed backend (`FileSystemStateBackend`, `MemoryStateBackend`).
    
    A note on restoring: RocksDB state will only be published for queries on `getPartitionedState()` whereas Java heap-backed state is already published on `injectKeyValueStateSnapshots()`. This has to do with the way that the RocksDB state backend organizes the state internally. I didn't want to change a lot there and I think it's a fair compromise for the first version.
    
    ## Added `KvStateLocationRegistry` to JobManager
    
    The `KvStateRegistry` of each TaskManager reports the registered state instances to the JobManager, where they are aggregated by the `KvStateLocationRegistry`. The purpose of this is to allow clients to query the JobManager for location information about the state they want to query. There is one `KvStateLocation` for each registered queryable state, which maps each key group index (currently the sub task index) to the server address holding the state instance.
    
    With this, the client can figure out which TaskManager to query for each key. Only when the location is unknown or out-of-sync, there needs to be communication between the client and JobManager.
    
    The lookup of `KvStateLocation` instances happens via Akka.
    
    ## Added `KvStateClient` and `KvStateServer` for network transfers
    
    The `KvStateClient` and `KvStateServer` are responsible for the actual data exchange via TCP. Each TaskManager runs a single `KvStateServer`, which queries the local `KvStateRegistry` on incoming requests.
    
    Connections are established and released by the client. Only on failures, does the server close a connection. Each client connection can be shared by multiple Threads.
    
    Both client and server keep track of statistics for their respects (how many requests and how long did they take).
    
    # Limitations
    
    - User docs are sparse. I wanted to wait for some initial feedback with this PR before writing anything.
    
    - The queryable state life-cycle is bound to the life-cycle of the job, e.g. tasks register queryable state on startup and unregister it on dispose. In future versions, it is desirable to decouple this in order to allow queries after a task finishes and to speed up recovery via state replication.
    
    - Notifications about available `KvState` happen via a simple `tell`. This should be improved to be more robust with `ask`s and acknowledgements. This was held simple on purpose in anticipation of possible state replication improvements (see first point), which probably need a differnt model of reporting available state.
    
    - The server and client keep track of statistics for queries. These are currently disabled by default as they would not be exposed anywhere. As soon as there is better support to publish these numbers via the Metrics system, we should enable the stats.


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/uce/flink 3779-queryable_state

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/2051.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2051
    
----
commit c1e8466f09787d229b78019e4d33d1c64932c74e
Author: Ufuk Celebi <uc...@apache.org>
Date:   2016-05-30T11:42:39Z

    [FLINK-3779] [runtime] Add getSerializedValue(byte[]) to KvState
    
    [statebackend-rocksdb, core, streaming-java]
    
    - Adds the getSerializedValue(byte[]) to KvState, which is used to query single
      KvState instances. The serialization business is left to the KvState in order
      to not burden the accessor -- e.g. the querying network thread -- with setting
      up/accessing the serializers.
    
    - Adds quaryable flag to the StateDescriptor. State, which sets a queryable state
      name will be published for queries to the KvStateRegistry.
    
    - Prohibts null namespace and enforces VoidNamespace instead. This makes stuff
      more explicit. Furthermore, the concurrent map used for queryable memroy state
      does not allow working with null keys.

commit cd651a651c247e3638d81c3db32b9619a35aaf2a
Author: Ufuk Celebi <uc...@apache.org>
Date:   2016-05-30T12:03:35Z

    [FLINK-3779] [runtime] Add KvStateRegistry for queryable KvState
    
    [streaming-java]
    
    - Adds a KvStateRegistry per TaskManager at which created KvState instances are
      registered/unregistered.
    
    - Registered KvState instances are reported to the JobManager, whcih can be
      queried for KvStateLocation.

commit 3405e4a5fc3429f211b5ed0e4d6da394f7d5bb4e
Author: Ufuk Celebi <uc...@apache.org>
Date:   2016-05-30T12:00:49Z

    [FLINK-3779] [runtime] Add KvState network client and server
    
    - Adds a Netty-based server and client to query KvState instances, which have
      been published to the KvStateRegistry.

commit bb4baa33aaca8475d46da4c6c0ec3cbecbefa81a
Author: Ufuk Celebi <uc...@apache.org>
Date:   2016-05-30T12:08:03Z

    [FLINK-3779] [runtime] Add KvStateLocation lookup service
    
    - Adds an Akka-based KvStateLocation lookup service to be used by the client
      to look up location information.

commit 7256e1dae34d6dbb8d96560bd0eac7a51ced7515
Author: Ufuk Celebi <uc...@apache.org>
Date:   2016-05-30T12:08:24Z

    [FLINK-3779] [runtime] Add QueryableStateClient
    
    - Adds a client, which works with the network client and location lookup service
      to query KvState instances.
    
    - Furthermore, location information is cached.

commit d7e602a55a4cfa6825b2126d51ed711d3c6ea866
Author: Ufuk Celebi <uc...@apache.org>
Date:   2016-05-30T12:08:34Z

    [FLINK-3779] [streaming-java, streaming-scala] Add QueryableStateStream to KeyedStream
    
    [runtime, test-utils, tests]
    
    - Exposes queryable state on the API via KeyedStream#asQueryableState(String, StateDescriptor).
      This creates and operator, which consumes the keyed stream and exposes the stream
      as queryable state.

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2051: [FLINK-3779] Add support for queryable state

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the issue:

    https://github.com/apache/flink/pull/2051
  
    Good from my side.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2051: [FLINK-3779] Add support for queryable state

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/2051


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2051: [FLINK-3779] Add support for queryable state

Posted by soniclavier <gi...@git.apache.org>.
Github user soniclavier commented on the issue:

    https://github.com/apache/flink/pull/2051
  
    Hi,
    
    Continuing the discussion from the mailing list, I was able to go past the NettyConfig problem once I ran Flink in cluster mode ( I would still like to know if there is a way to run in local mode so that I can avoid running SBT assembly every time ).
    
    But now I am stuck at error message "KvState does not hold any state for key/namespace." which I believe is because of my KeySerializer. Since I am running the QueryClient as a separate application, I don't have access to my queryableState to call `queryableState.getKeySerializer`
    
    My key is a tuple of (Long,String) and this is the naive serializer that I wrote (which is probably wrong and I have never written a serializer before)
    
    ```
    class KeySerializer extends TypeSerializerSingleton[(Long,String)]{
    
        private val EMPTY: (Long,String) = (0,"")
    
        override def createInstance(): (Long, String) = EMPTY
    
        override def getLength: Int = return 2;
    
        override def canEqual(o: scala.Any): Boolean = return o.isInstanceOf[(Long,String)]
    
        override def copy(t: (Long, String)): (Long, String) = t
    
        override def copy(t: (Long, String), t1: (Long, String)): (Long, String) = t
    
        override def copy(dataInputView: DataInputView, dataOutputView: DataOutputView): Unit =  {
          dataOutputView.writeLong(dataInputView.readLong())
          StringValue.copyString(dataInputView,dataOutputView)
        }
    
        override def serialize(t: (Long, String), dataOutputView: DataOutputView): Unit = {
          dataOutputView.writeLong(t._1)
          StringValue.writeString(t._2,dataOutputView)
        }
    
        override def isImmutableType: Boolean = true
    
        override def deserialize(dataInputView: DataInputView): (Long, String) = {
          val l = dataInputView.readLong()
          val s = StringValue.readString(dataInputView)
          (l,s)
        }
    
        override def deserialize(t: (Long, String), dataInputView: DataInputView): (Long, String) = deserialize(dataInputView)
      }
    ```
    
    Can you tell me what I am doing wrong here? Thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2051: [FLINK-3779] Add support for queryable state

Posted by soumyasd <gi...@git.apache.org>.
Github user soumyasd commented on the issue:

    https://github.com/apache/flink/pull/2051
  
    Any idea which Flink version this feature is going live with ? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3779] Add support for queryable state

Posted by gyfora <gi...@git.apache.org>.
Github user gyfora commented on the pull request:

    https://github.com/apache/flink/pull/2051#issuecomment-222600015
  
    Thanks for the explanation. I will start testing/reviewing  this later today.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2051: [FLINK-3779] Add support for queryable state

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the issue:

    https://github.com/apache/flink/pull/2051
  
    A simpler way to get the serializer may be
    ```java
    TypeInformation.of(new TypeHint<Tuple2<String, Long>>(){}).createSerializer(null);
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3779] Add support for queryable state

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on the pull request:

    https://github.com/apache/flink/pull/2051#issuecomment-222648249
  
    Hi,
    just some high-level remarks about the API of `KeyedStream` and `QueryableStateStream`. You could parameterize them both by the State type. This way you would also get a very nice way of generically accessing the different types of state on the query side. Let me quickly show what I mean. `KeyedStream` would have this method:
    
    ```
    <S extends State> QueryableStateStream<KEY, S> asQueryableState(
        String queryableStateName,
        StateDescriptor<S, ?> stateDescriptor);
    ```
    
    the signature of `QueryableStateStream` would be like this:
    
    ```
    public class QueryableStateStream<K, S extends State> {
        private StateDescriptor<S, ?> stateDescriptor;
        
        /**
         * Read the state represented by this stream using a state client
         */ 
        public S read(K key, QueryableStateClient client) {
            return stateDescriptor.bind(new QueryStateBinder(client.read(key, stateName)));
        }
    }
    ```
    
    The nice thing about `StateDescriptor.bind()` is that you can use this to create a state reader based on the state type. You pass in a custom `StateBackend` (this should really be called `StateBinder`, btw). That constructs a state reader that deserializes the bytes read from the state client.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2051: [FLINK-3779] Add support for queryable state

Posted by uce <gi...@git.apache.org>.
Github user uce commented on the issue:

    https://github.com/apache/flink/pull/2051
  
    Regarding local vs. cluster mode: that's on purpose, but we can certainly change that behaviour. For now, you would have to run in cluster mode. 
    
    Regarding the serializer: assuming that it is a Flink `Tuple2<Long, String>` you can use the following to get the serializer:
    
    ```java
    TypeSerializer<?>[] fieldSerializers = new TypeSerializer[] {
        StringSerializer.INSTANCE,
        LongSerializer.INSTANCE
    };
    
    TypeSerializer<Tuple2<String, Long>> serializer = new TupleSerializer<>(
        (Class<Tuple2<String, Long>>) (Class<?>) Tuple2.class, fieldSerializers);
    ```
    
    **Just to make sure that we are on the same page: the state of this PR is not the final queryable state API, but only the initial low-level version.** Really looking forward to further feedback. Thank you for trying it out at this stage. :-)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3779] Add support for queryable state

Posted by uce <gi...@git.apache.org>.
Github user uce commented on the pull request:

    https://github.com/apache/flink/pull/2051#issuecomment-222625496
  
    1. I think that you are very much looking at it with your specific use case in mind, which is (I think) querying other operators from within your operators. To me, that's not the main use case for queryable state though... For that, it's true that the `QueryableStateStream` does not provide much help. If you are already using partitioned state manually, then you will probably go with the `setQueryable` method, that's true. But for some state like Flink's internal windows, you don't have access to the `StateDescriptor` (which is currently not exposed for queries on the API though). Furthermore, I think it's good to provide a low barrier way of doing things. But if others feel the same, I'm certainly OK with removing it.
    
    2. I think that would be possible, yes. I agree that the `QueryableStateStream` is conceptually similar to a sink.
    
    3. I agree, but that was on purpose for the first version until we figure out how to expose it properly. You have utilities in `KvStateRequestSerializer` to serialize key and namespace and you can use `QueryableStateStream` to access the key and value serializer. Namespace is usually `VoidNamespace` if you are not querying windows.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2051: [FLINK-3779] Add support for queryable state

Posted by soniclavier <gi...@git.apache.org>.
Github user soniclavier commented on the issue:

    https://github.com/apache/flink/pull/2051
  
    Never mind, I was hitting with wrong key, it works now! Cheers.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2051: [FLINK-3779] Add support for queryable state

Posted by uce <gi...@git.apache.org>.
Github user uce commented on the issue:

    https://github.com/apache/flink/pull/2051
  
    @soniclavier I think this is not configurable in Flink at the moment. The client uses the `LeaderRetrievalService` to retrieve the job manager path. 
    
    @soumyasd I hope to merge this after the 1.1 fork-off this week. This would mean that it would be part of the 1.2 release (~3 months if everything goes according to release schedule).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2051: [FLINK-3779] Add support for queryable state

Posted by uce <gi...@git.apache.org>.
Github user uce commented on the issue:

    https://github.com/apache/flink/pull/2051
  
    Now that we have forked off the 1.1 release branch, I would like to merge this if there are no objections. There are not many changes to our current code base and the follow ups can be addressed until the 1.2 release.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2051: [FLINK-3779] Add support for queryable state

Posted by soniclavier <gi...@git.apache.org>.
Github user soniclavier commented on the issue:

    https://github.com/apache/flink/pull/2051
  
    Thanks Ufuk & Stephen for the reply,
    
    I tried the serializers suggested by you
    ```
    val typeHint = new TypeHint[Tuple2[Long,String]](){}
    val serializer = TypeInformation.of(typeHint).createSerializer(null)
    
    //also tried this
    val fieldSerializers = Array[TypeSerializer[_]](StringSerializer.INSTANCE, LongSerializer.INSTANCE)
    val serializer2 = new TupleSerializer(classOf[Tuple2[Long,String]].asInstanceOf[Class[_]].asInstanceOf[Class[Tuple2[String, Long]]], fieldSerializers)
    ```
    
    But both gives me compilation error at
    ```
    val serializedKey = KvStateRequestSerializer.serializeKeyAndNamespace(
          key,
          serializer2,
          VoidNamespace.INSTANCE,
          VoidNamespaceSerializer.INSTANCE)
    ```
    the compilation error is:
    ```
    Error:(43, 7) type mismatch;
    found   : org.apache.flink.api.common.typeutils.TypeSerializer[org.apache.flink.api.java.tuple.Tuple2[Long,String]]
     required: org.apache.flink.api.common.typeutils.TypeSerializer[java.io.Serializable]
    Note: org.apache.flink.api.java.tuple.Tuple2[Long,String] <: java.io.Serializable, but Java-defined class TypeSerializer is invariant in type T.
    You may wish to investigate a wildcard type such as `_ <: java.io.Serializable`. (SLS 3.2.10)
          serializer,
          ^
    ```
    
    I had seen this before when I tried to set the serializer from `queryableState.getKeySerializer` 
    
    Note : It works fine when I use the longer version of serializer that I created.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3779] Add support for queryable state

Posted by gyfora <gi...@git.apache.org>.
Github user gyfora commented on the pull request:

    https://github.com/apache/flink/pull/2051#issuecomment-222627001
  
    You are probably right that I am little biased regarding (1), sorry :) 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2051: [FLINK-3779] Add support for queryable state

Posted by soniclavier <gi...@git.apache.org>.
Github user soniclavier commented on the issue:

    https://github.com/apache/flink/pull/2051
  
    One more question, is it possible  to configure the JobManager Actor path that the client connects to, it looks like it default to `akka://flink/user/jobmanager`.
    In that way I can create a much more generic client. 
    
    Note: I know this is initial version, just curious if this is already implemented. :)
    



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3779] Add support for queryable state

Posted by gyfora <gi...@git.apache.org>.
Github user gyfora commented on the pull request:

    https://github.com/apache/flink/pull/2051#issuecomment-222548679
  
    Awesome feature Ufuk, I am very excited to try this out and give some feedback :)
    
    So if I understand correctly in order to query the state I can use the queryable state stream. Would it be also possible to query the state from an arbitrary operator with the same logic (from a map for instance). Also what happens if I call setQueryable... on a ValueStateDescriptor when creating a state in my udf?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2051: [FLINK-3779] Add support for queryable state

Posted by rehevkor5 <gi...@git.apache.org>.
Github user rehevkor5 commented on the issue:

    https://github.com/apache/flink/pull/2051
  
    Hi, it's great to see that someone is working on this stuff!
    
    I just wanted to put in my two cents, to provide a different perspective that might change how you are thinking about this.
    
    On my project, we are interested in incorporating pre-computed historical time-series data into the values within a time window. Those values would need to be loaded from a distributed database such as Cassandra or DynamoDB. Also, we would like for newly computed time-series data points (produced by a Flink window pane) to be persisted externally, side-by-side with the historical data (in Cassandra/DynamoDB).
    
    In contrast with your approach, which enables querying of state from within Flink, we are more interested in querying that state from the external database. This allows the Flink job to produce time series data which can be queried ad-hoc in the database, while also allowing the Flink job to produce pre-calculated aggregates from that time series.
    
    I believe others have mentioned in this thread the need, therefore, to allow the State Store to choose the serialization approach. While serializing to byte[] works well for Memory and RocksDB State Stores, inserting into a NoSQL database requires creation of an INSERT command with data that includes primary/partition key, secondary/range key, and arbitrarily structured data (one column of byte[], or perhaps more complex based on the particular type of value). In particular, we need the timestamp of the time series point to be a top-level value in the INSERT, so that time range queries can be efficient. The interface is also important when it comes to Flink loading pre-existing data, because Flink or an integration layer will need to know how to query for the particular keys it is looking for.
    
    I hope that makes sense & gives some perspective on what some people are thinking about with regard to "queryable state".


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2051: [FLINK-3779] Add support for queryable state

Posted by soniclavier <gi...@git.apache.org>.
Github user soniclavier commented on the issue:

    https://github.com/apache/flink/pull/2051
  
    Sorry, the compilation error was because the Tuple2 was scala.Tuple2 not flink Tuple2. Changing to `org.apache.flink.api.java.tuple.Tuple2` fixed the issue.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3779] Add support for queryable state

Posted by uce <gi...@git.apache.org>.
Github user uce commented on the pull request:

    https://github.com/apache/flink/pull/2051#issuecomment-222554801
  
    Hey Gyula!
    
    The `QueryableStateStream` is only for convenience. It's just a creating a `AbstractQueryableStateOperator`, which takes care of setting up an operator, which consumes data and calls the `state.update()` or `state.add()` method respectively.
    
    You can use the `QueryableStateClient` anywhere you like, including your map operator. Just be aware that further Threads will be started for the network communication (configurable). You can share it between operators though.
    
    You can make any `KvState` queryable by calling the `setQueryable(String)` method of its state descriptor.
    
    
    
    
    
    



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3779] Add support for queryable state

Posted by gyfora <gi...@git.apache.org>.
Github user gyfora commented on the pull request:

    https://github.com/apache/flink/pull/2051#issuecomment-222606282
  
    I will gradually add some questions/comments as I go :)
    
    1. Do we really need a QueryableStateStream exposed in the API? As you said this is just a pretty basic sink that anyone can probably inline who knows Flink enough to use the States. And I am guessing looking inside other operators is probably the most interesting use-case for this new feature. 
    
    (2. If we decide to keep the QueryableStateStream could it be just implemented as a simple RichSink? stream.addSink(...) instead of adding another operator to the runtime layer)
    
     3. I think it would be great to make the KvStateClient somehow aware of the types, because passing keynamespace byte arrays will be confusing for users as they will have no idea what the namespace is. The namespaces are pretty internal to the system. Maybe we could allow users to register serializers for state ids in the KvStateClient so they would not need to manually pass byte arrays and they could work with the actual keys. Also if they don't want to specify the namespace, we could then serialize it with the VoidSerializer as it usually happens in the runtime in most cases.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---