You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by soniclavier <gi...@git.apache.org> on 2016/07/21 15:51:51 UTC

[GitHub] flink issue #2051: [FLINK-3779] Add support for queryable state

Github user soniclavier commented on the issue:

    https://github.com/apache/flink/pull/2051
  
    Hi,
    
    Continuing the discussion from the mailing list, I was able to go past the NettyConfig problem once I ran Flink in cluster mode ( I would still like to know if there is a way to run in local mode so that I can avoid running SBT assembly every time ).
    
    But now I am stuck at error message "KvState does not hold any state for key/namespace." which I believe is because of my KeySerializer. Since I am running the QueryClient as a separate application, I don't have access to my queryableState to call `queryableState.getKeySerializer`
    
    My key is a tuple of (Long,String) and this is the naive serializer that I wrote (which is probably wrong and I have never written a serializer before)
    
    ```
    class KeySerializer extends TypeSerializerSingleton[(Long,String)]{
    
        private val EMPTY: (Long,String) = (0,"")
    
        override def createInstance(): (Long, String) = EMPTY
    
        override def getLength: Int = return 2;
    
        override def canEqual(o: scala.Any): Boolean = return o.isInstanceOf[(Long,String)]
    
        override def copy(t: (Long, String)): (Long, String) = t
    
        override def copy(t: (Long, String), t1: (Long, String)): (Long, String) = t
    
        override def copy(dataInputView: DataInputView, dataOutputView: DataOutputView): Unit =  {
          dataOutputView.writeLong(dataInputView.readLong())
          StringValue.copyString(dataInputView,dataOutputView)
        }
    
        override def serialize(t: (Long, String), dataOutputView: DataOutputView): Unit = {
          dataOutputView.writeLong(t._1)
          StringValue.writeString(t._2,dataOutputView)
        }
    
        override def isImmutableType: Boolean = true
    
        override def deserialize(dataInputView: DataInputView): (Long, String) = {
          val l = dataInputView.readLong()
          val s = StringValue.readString(dataInputView)
          (l,s)
        }
    
        override def deserialize(t: (Long, String), dataInputView: DataInputView): (Long, String) = deserialize(dataInputView)
      }
    ```
    
    Can you tell me what I am doing wrong here? Thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---