You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2016/07/21 15:52:20 UTC

[jira] [Commented] (FLINK-3779) Add support for queryable state

    [ https://issues.apache.org/jira/browse/FLINK-3779?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15387930#comment-15387930 ] 

ASF GitHub Bot commented on FLINK-3779:
---------------------------------------

Github user soniclavier commented on the issue:

    https://github.com/apache/flink/pull/2051
  
    Hi,
    
    Continuing the discussion from the mailing list, I was able to go past the NettyConfig problem once I ran Flink in cluster mode ( I would still like to know if there is a way to run in local mode so that I can avoid running SBT assembly every time ).
    
    But now I am stuck at error message "KvState does not hold any state for key/namespace." which I believe is because of my KeySerializer. Since I am running the QueryClient as a separate application, I don't have access to my queryableState to call `queryableState.getKeySerializer`
    
    My key is a tuple of (Long,String) and this is the naive serializer that I wrote (which is probably wrong and I have never written a serializer before)
    
    ```
    class KeySerializer extends TypeSerializerSingleton[(Long,String)]{
    
        private val EMPTY: (Long,String) = (0,"")
    
        override def createInstance(): (Long, String) = EMPTY
    
        override def getLength: Int = return 2;
    
        override def canEqual(o: scala.Any): Boolean = return o.isInstanceOf[(Long,String)]
    
        override def copy(t: (Long, String)): (Long, String) = t
    
        override def copy(t: (Long, String), t1: (Long, String)): (Long, String) = t
    
        override def copy(dataInputView: DataInputView, dataOutputView: DataOutputView): Unit =  {
          dataOutputView.writeLong(dataInputView.readLong())
          StringValue.copyString(dataInputView,dataOutputView)
        }
    
        override def serialize(t: (Long, String), dataOutputView: DataOutputView): Unit = {
          dataOutputView.writeLong(t._1)
          StringValue.writeString(t._2,dataOutputView)
        }
    
        override def isImmutableType: Boolean = true
    
        override def deserialize(dataInputView: DataInputView): (Long, String) = {
          val l = dataInputView.readLong()
          val s = StringValue.readString(dataInputView)
          (l,s)
        }
    
        override def deserialize(t: (Long, String), dataInputView: DataInputView): (Long, String) = deserialize(dataInputView)
      }
    ```
    
    Can you tell me what I am doing wrong here? Thanks!


> Add support for queryable state
> -------------------------------
>
>                 Key: FLINK-3779
>                 URL: https://issues.apache.org/jira/browse/FLINK-3779
>             Project: Flink
>          Issue Type: Improvement
>          Components: Distributed Coordination
>            Reporter: Ufuk Celebi
>            Assignee: Ufuk Celebi
>
> Flink offers state abstractions for user functions in order to guarantee fault-tolerant processing of streams. Users can work with both non-partitioned (Checkpointed interface) and partitioned state (getRuntimeContext().getState(ValueStateDescriptor) and other variants).
> The partitioned state interface provides access to different types of state that are all scoped to the key of the current input element. This type of state can only be used on a KeyedStream, which is created via stream.keyBy().
> Currently, all of this state is internal to Flink and used in order to provide processing guarantees in failure cases (e.g. exactly-once processing).
> The goal of Queryable State is to expose this state outside of Flink by supporting queries against the partitioned key value state.
> This will help to eliminate the need for distributed operations/transactions with external systems such as key-value stores which are often the bottleneck in practice. Exposing the local state to the outside moves a good part of the database work into the stream processor, allowing both high throughput queries and immediate access to the computed state.
> This is the initial design doc for the feature: https://docs.google.com/document/d/1NkQuhIKYmcprIU5Vjp04db1HgmYSsZtCMxgDi_iTN-g. Feel free to comment.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)