You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Till Rohrmann <tr...@apache.org> on 2018/11/07 16:27:04 UTC

Re: Queryable state when key is UUID - getting Kyro Exception

Hi Jayant, could you check that the UUID key on the TM is actually
serialized using a Kryo serializer? You can do this by setting a breakpoint
in the constructor of the `AbstractKeyedStateBackend`.

Cheers,
Till

On Tue, Oct 30, 2018 at 9:44 AM bupt_ljy <bu...@163.com> wrote:

> Hi, Jayant
>
>     Your code looks good to me. And I’ve tried the serialize/deserialize
> of Kryo on UUID class, it all looks okay.
>
>     I’m not very sure about this problem. Maybe you can write a very
> simple demo to try if it works.
>
>
> Jiayi Liao, Best
>
>  Original Message
> *Sender:* Jayant Ameta<wi...@gmail.com>
> *Recipient:* bupt_ljy<bu...@163.com>
> *Cc:* Tzu-Li (Gordon) Tai<tz...@apache.org>; user<user@flink.apache.org
> >
> *Date:* Monday, Oct 29, 2018 11:53
> *Subject:* Re: Queryable state when key is UUID - getting Kyro Exception
>
> Hi Jiayi,
> Any further help on this?
>
> Jayant Ameta
>
>
> On Fri, Oct 26, 2018 at 9:22 AM Jayant Ameta <wi...@gmail.com> wrote:
>
>> MapStateDescriptor<UUID, String> descriptor = new MapStateDescriptor<>("rulePatterns", UUID.class,
>>     String.class);
>>
>> Jayant Ameta
>>
>>
>> On Fri, Oct 26, 2018 at 8:19 AM bupt_ljy <bu...@163.com> wrote:
>>
>>> Hi,
>>>
>>>    Can you show us the descriptor in the codes below?
>>>
>>>     client.getKvState(JobID.fromHexString(
>>> "c7b8af14b8afacf4fac16cdd0da7e997"), "rule",
>>>
>>>         UUID.fromString("3b3f17a0-d81a-11e8-bb91-7fd1412de84d"),
>>>>         TypeInformation.of(new TypeHint<UUID>() {}), descriptor);
>>>>
>>>>
>>> Jiayi Liao, Best
>>>
>>>
>>>  Original Message
>>> *Sender:* Jayant Ameta<wi...@gmail.com>
>>> *Recipient:* bupt_ljy<bu...@163.com>
>>> *Cc:* Tzu-Li (Gordon) Tai<tz...@apache.org>; user<
>>> user@flink.apache.org>
>>> *Date:* Friday, Oct 26, 2018 02:26
>>> *Subject:* Re: Queryable state when key is UUID - getting Kyro Exception
>>>
>>> Also, I haven't provided any custom serializer in my flink job.
>>> Shouldn't the same configuration work for queryable state client?
>>>
>>> Jayant Ameta
>>>
>>>
>>> On Thu, Oct 25, 2018 at 4:15 PM Jayant Ameta <wi...@gmail.com>
>>> wrote:
>>>
>>>> Hi Gordon,
>>>> Following is the stack trace that I'm getting:
>>>>
>>>> *Exception in thread "main" java.util.concurrent.ExecutionException:
>>>> java.lang.RuntimeException: Failed request 0.*
>>>> * Caused by: java.lang.RuntimeException: Failed request 0.*
>>>> * Caused by: java.lang.RuntimeException: Error while processing request
>>>> with ID 0. Caused by: com.esotericsoftware.kryo.KryoException: Encountered
>>>> unregistered class ID: -985346241*
>>>> *Serialization trace:*
>>>> *$outer (scala.collection.convert.Wrappers$SeqWrapper)*
>>>> * at
>>>> com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:119)*
>>>> * at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)*
>>>> * at
>>>> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:99)*
>>>> * at
>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)*
>>>> * at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)*
>>>> * at
>>>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:249)*
>>>> * at
>>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:136)*
>>>> * at
>>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)*
>>>> * at
>>>> org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer.deserializeKeyAndNamespace(KvStateSerializer.java:94)*
>>>> * at
>>>> org.apache.flink.runtime.state.heap.AbstractHeapState.getSerializedValue(AbstractHeapState.java:93)*
>>>> * at
>>>> org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:87)*
>>>> * at
>>>> org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:49)*
>>>> * at
>>>> org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:229)*
>>>> * at
>>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)*
>>>> * at java.util.concurrent.FutureTask.run(FutureTask.java:266)*
>>>> * at
>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)*
>>>> * at
>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)*
>>>> * at java.lang.Thread.run(Thread.java:748)*
>>>>
>>>> I am not using any custom serialize as mentioned by Jiayi.
>>>>
>>>> Jayant Ameta
>>>>
>>>>
>>>> On Thu, Oct 25, 2018 at 3:01 PM bupt_ljy <bu...@163.com> wrote:
>>>>
>>>>> Hi  Jayant,
>>>>>
>>>>>   There should be a Serializer parameter in the constructor of the
>>>>> StateDescriptor, you should create a new serializer like this:
>>>>>
>>>>>
>>>>>    new GenericTypeInfo(classOf[UUID]).createSerializer(env.getConfig)
>>>>>
>>>>>
>>>>>  By the way, can you show us your kryo exception like what Gordon said?
>>>>>
>>>>>
>>>>> Jiayi Liao, Best
>>>>>
>>>>>
>>>>>
>>>>>  Original Message
>>>>> *Sender:* Tzu-Li (Gordon) Tai<tz...@apache.org>
>>>>> *Recipient:* Jayant Ameta<wi...@gmail.com>; bupt_ljy<
>>>>> bupt_ljy@163.com>
>>>>> *Cc:* user<us...@flink.apache.org>
>>>>> *Date:* Thursday, Oct 25, 2018 17:18
>>>>> *Subject:* Re: Queryable state when key is UUID - getting Kyro
>>>>> Exception
>>>>>
>>>>> Hi Jayant,
>>>>>
>>>>> What is the Kryo exception message that you are getting?
>>>>>
>>>>> Cheers,
>>>>> Gordon
>>>>>
>>>>>
>>>>> On 25 October 2018 at 5:17:13 PM, Jayant Ameta (wittyameta@gmail.com)
>>>>> wrote:
>>>>>
>>>>> Hi,
>>>>> I've not configured any serializer in the descriptor. (Neither in
>>>>> flink job, nor in state query client).
>>>>> Which serializer should I use?
>>>>>
>>>>> Jayant Ameta
>>>>>
>>>>>
>>>>> On Thu, Oct 25, 2018 at 2:13 PM bupt_ljy <bu...@163.com> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>>    It seems that your codes are right. Are you sure that you’re using
>>>>>> the same Serializer as the Flink program do? Could you show the serializer
>>>>>> in descriptor?
>>>>>>
>>>>>>
>>>>>>
>>>>>> Jiayi Liao, Best
>>>>>>
>>>>>>  Original Message
>>>>>> *Sender:* Jayant Ameta<wi...@gmail.com>
>>>>>> *Recipient:* user<us...@flink.apache.org>
>>>>>> *Date:* Thursday, Oct 25, 2018 14:17
>>>>>> *Subject:* Queryable state when key is UUID - getting Kyro Exception
>>>>>>
>>>>>> I get Kyro exception when querying the state.
>>>>>>
>>>>>> Key: UUID
>>>>>> MapState<UUID, String>
>>>>>>
>>>>>> Client code snippet:
>>>>>>
>>>>>> CompletableFuture<MapState<UUID, String>> resultFuture =
>>>>>>     client.getKvState(JobID.fromHexString("c7b8af14b8afacf4fac16cdd0da7e997"), "rule",
>>>>>>         UUID.fromString("3b3f17a0-d81a-11e8-bb91-7fd1412de84d"),
>>>>>>         TypeInformation.of(new TypeHint<UUID>() {}), descriptor);
>>>>>> MapState<UUID, String> mapState = resultFuture.get(10, TimeUnit.SECONDS);
>>>>>>
>>>>>>
>>>>>> Any better way to query it?
>>>>>>
>>>>>>
>>>>>> Jayant Ameta
>>>>>>
>>>>>

Re: Queryable state when key is UUID - getting Kyro Exception

Posted by Jayant Ameta <wi...@gmail.com>.
Hi Till,
Here is the client snippet. Here Rule is a custom POJO that I use.

public static void main(String[] args)
    throws IOException, InterruptedException, ExecutionException {
  UUID uuid = UUID.fromString("2ba14b80-e6ff-11e8-908b-9bd8fd37bffb");

  QueryableStateClient client = new QueryableStateClient("127.0.1.1", 9069);
  ExecutionConfig config = new ExecutionConfig();
  client.setExecutionConfig(config);

  MapStateDescriptor<UUID, Rule> descriptor = new
MapStateDescriptor<>("rulePatterns", UUID.class,
      Rule.class);
  CompletableFuture<MapState<UUID, Rule>> resultFuture =
      client.getKvState(JobID.fromHexString("337f4476388fabc6f29bb4fb9107082c"),
"rules",
          uuid, TypeInformation.of(UUID.class), descriptor);

  while (!resultFuture.isDone()) {
    Thread.sleep(1000);
  }
  resultFuture.whenComplete((result, throwable) -> {
    if (throwable != null) {
      throwable.printStackTrace();
    } else {
      try {
        System.out.println(result.get(uuid));
      } catch (Exception e) {
        e.printStackTrace();
      }
    }
  });
}


Below is the stack trace:

Caused by: java.lang.RuntimeException: Error while processing request with
ID 12. Caused by: java.io.IOException: Unable to deserialize key and
namespace. This indicates a mismatch in the key/namespace serializers used
by the KvState instance and this access.
at
org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer.deserializeKeyAndNamespace(KvStateSerializer.java:107)
at
org.apache.flink.runtime.state.heap.AbstractHeapState.getSerializedValue(AbstractHeapState.java:93)
at
org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:87)
at
org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:49)
at
org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:229)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.EOFException
at
org.apache.flink.core.memory.DataInputDeserializer.readUnsignedByte(DataInputDeserializer.java:307)
at org.apache.flink.types.StringValue.readString(StringValue.java:770)
at
org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:69)
at
org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:28)
at
org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:136)
at
org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
at
org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer.deserializeKeyAndNamespace(KvStateSerializer.java:94)
... 9 more

at
org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:98)
at
org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:49)
at
org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:229)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)

at
org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.lambda$run$0(AbstractServerHandler.java:266)
at
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
at
java.util.concurrent.CompletableFuture.uniWhenCompleteStage(CompletableFuture.java:778)
at
java.util.concurrent.CompletableFuture.whenComplete(CompletableFuture.java:2140)
at
org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:229)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)

at
java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
at
java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
at
java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:593)
at
java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)
at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
at
org.apache.flink.queryablestate.network.Client$PendingConnection.lambda$handInChannel$0(Client.java:324)
at
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
at
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
at
org.apache.flink.queryablestate.network.Client$EstablishedConnection.onRequestFailure(Client.java:563)
at
org.apache.flink.queryablestate.network.ClientHandler.channelRead(ClientHandler.java:84)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
at
org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:86)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
at
org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:242)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
at
org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:847)
at
org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
at
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
at
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
at
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
at
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
at
org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
at java.lang.Thread.run(Thread.java:748)

Jayant Ameta


On Fri, Nov 9, 2018 at 5:14 PM Till Rohrmann <tr...@apache.org> wrote:

> Could you send us a small example program which we can use to reproduce
> the problem?
>
> Cheers,
> Till
>
> On Fri, Nov 9, 2018 at 6:57 AM Jayant Ameta <wi...@gmail.com> wrote:
>
>> Yeah, it IS using Kryo serializer.
>>
>> Jayant Ameta
>>
>>
>> On Wed, Nov 7, 2018 at 9:57 PM Till Rohrmann <tr...@apache.org>
>> wrote:
>>
>>> Hi Jayant, could you check that the UUID key on the TM is actually
>>> serialized using a Kryo serializer? You can do this by setting a breakpoint
>>> in the constructor of the `AbstractKeyedStateBackend`.
>>>
>>> Cheers,
>>> Till
>>>
>>> On Tue, Oct 30, 2018 at 9:44 AM bupt_ljy <bu...@163.com> wrote:
>>>
>>>> Hi, Jayant
>>>>
>>>>     Your code looks good to me. And I’ve tried the
>>>> serialize/deserialize of Kryo on UUID class, it all looks okay.
>>>>
>>>>     I’m not very sure about this problem. Maybe you can write a very
>>>> simple demo to try if it works.
>>>>
>>>>
>>>> Jiayi Liao, Best
>>>>
>>>>  Original Message
>>>> *Sender:* Jayant Ameta<wi...@gmail.com>
>>>> *Recipient:* bupt_ljy<bu...@163.com>
>>>> *Cc:* Tzu-Li (Gordon) Tai<tz...@apache.org>; user<
>>>> user@flink.apache.org>
>>>> *Date:* Monday, Oct 29, 2018 11:53
>>>> *Subject:* Re: Queryable state when key is UUID - getting Kyro
>>>> Exception
>>>>
>>>> Hi Jiayi,
>>>> Any further help on this?
>>>>
>>>> Jayant Ameta
>>>>
>>>>
>>>> On Fri, Oct 26, 2018 at 9:22 AM Jayant Ameta <wi...@gmail.com>
>>>> wrote:
>>>>
>>>>> MapStateDescriptor<UUID, String> descriptor = new MapStateDescriptor<>("rulePatterns", UUID.class,
>>>>>     String.class);
>>>>>
>>>>> Jayant Ameta
>>>>>
>>>>>
>>>>> On Fri, Oct 26, 2018 at 8:19 AM bupt_ljy <bu...@163.com> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>>    Can you show us the descriptor in the codes below?
>>>>>>
>>>>>>     client.getKvState(JobID.fromHexString(
>>>>>> "c7b8af14b8afacf4fac16cdd0da7e997"), "rule",
>>>>>>
>>>>>>         UUID.fromString("3b3f17a0-d81a-11e8-bb91-7fd1412de84d"),
>>>>>>>         TypeInformation.of(new TypeHint<UUID>() {}), descriptor);
>>>>>>>
>>>>>>>
>>>>>> Jiayi Liao, Best
>>>>>>
>>>>>>
>>>>>>  Original Message
>>>>>> *Sender:* Jayant Ameta<wi...@gmail.com>
>>>>>> *Recipient:* bupt_ljy<bu...@163.com>
>>>>>> *Cc:* Tzu-Li (Gordon) Tai<tz...@apache.org>; user<
>>>>>> user@flink.apache.org>
>>>>>> *Date:* Friday, Oct 26, 2018 02:26
>>>>>> *Subject:* Re: Queryable state when key is UUID - getting Kyro
>>>>>> Exception
>>>>>>
>>>>>> Also, I haven't provided any custom serializer in my flink job.
>>>>>> Shouldn't the same configuration work for queryable state client?
>>>>>>
>>>>>> Jayant Ameta
>>>>>>
>>>>>>
>>>>>> On Thu, Oct 25, 2018 at 4:15 PM Jayant Ameta <wi...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi Gordon,
>>>>>>> Following is the stack trace that I'm getting:
>>>>>>>
>>>>>>> *Exception in thread "main" java.util.concurrent.ExecutionException:
>>>>>>> java.lang.RuntimeException: Failed request 0.*
>>>>>>> * Caused by: java.lang.RuntimeException: Failed request 0.*
>>>>>>> * Caused by: java.lang.RuntimeException: Error while processing
>>>>>>> request with ID 0. Caused by: com.esotericsoftware.kryo.KryoException:
>>>>>>> Encountered unregistered class ID: -985346241*
>>>>>>> *Serialization trace:*
>>>>>>> *$outer (scala.collection.convert.Wrappers$SeqWrapper)*
>>>>>>> * at
>>>>>>> com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:119)*
>>>>>>> * at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)*
>>>>>>> * at
>>>>>>> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:99)*
>>>>>>> * at
>>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)*
>>>>>>> * at
>>>>>>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)*
>>>>>>> * at
>>>>>>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:249)*
>>>>>>> * at
>>>>>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:136)*
>>>>>>> * at
>>>>>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)*
>>>>>>> * at
>>>>>>> org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer.deserializeKeyAndNamespace(KvStateSerializer.java:94)*
>>>>>>> * at
>>>>>>> org.apache.flink.runtime.state.heap.AbstractHeapState.getSerializedValue(AbstractHeapState.java:93)*
>>>>>>> * at
>>>>>>> org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:87)*
>>>>>>> * at
>>>>>>> org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:49)*
>>>>>>> * at
>>>>>>> org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:229)*
>>>>>>> * at
>>>>>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)*
>>>>>>> * at java.util.concurrent.FutureTask.run(FutureTask.java:266)*
>>>>>>> * at
>>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)*
>>>>>>> * at
>>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)*
>>>>>>> * at java.lang.Thread.run(Thread.java:748)*
>>>>>>>
>>>>>>> I am not using any custom serialize as mentioned by Jiayi.
>>>>>>>
>>>>>>> Jayant Ameta
>>>>>>>
>>>>>>>
>>>>>>> On Thu, Oct 25, 2018 at 3:01 PM bupt_ljy <bu...@163.com> wrote:
>>>>>>>
>>>>>>>> Hi  Jayant,
>>>>>>>>
>>>>>>>>   There should be a Serializer parameter in the constructor of the
>>>>>>>> StateDescriptor, you should create a new serializer like this:
>>>>>>>>
>>>>>>>>
>>>>>>>>    new GenericTypeInfo(classOf[UUID]).createSerializer(env
>>>>>>>> .getConfig)
>>>>>>>>
>>>>>>>>
>>>>>>>>  By the way, can you show us your kryo exception like what Gordon
>>>>>>>> said?
>>>>>>>>
>>>>>>>>
>>>>>>>> Jiayi Liao, Best
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>  Original Message
>>>>>>>> *Sender:* Tzu-Li (Gordon) Tai<tz...@apache.org>
>>>>>>>> *Recipient:* Jayant Ameta<wi...@gmail.com>; bupt_ljy<
>>>>>>>> bupt_ljy@163.com>
>>>>>>>> *Cc:* user<us...@flink.apache.org>
>>>>>>>> *Date:* Thursday, Oct 25, 2018 17:18
>>>>>>>> *Subject:* Re: Queryable state when key is UUID - getting Kyro
>>>>>>>> Exception
>>>>>>>>
>>>>>>>> Hi Jayant,
>>>>>>>>
>>>>>>>> What is the Kryo exception message that you are getting?
>>>>>>>>
>>>>>>>> Cheers,
>>>>>>>> Gordon
>>>>>>>>
>>>>>>>>
>>>>>>>> On 25 October 2018 at 5:17:13 PM, Jayant Ameta (
>>>>>>>> wittyameta@gmail.com) wrote:
>>>>>>>>
>>>>>>>> Hi,
>>>>>>>> I've not configured any serializer in the descriptor. (Neither in
>>>>>>>> flink job, nor in state query client).
>>>>>>>> Which serializer should I use?
>>>>>>>>
>>>>>>>> Jayant Ameta
>>>>>>>>
>>>>>>>>
>>>>>>>> On Thu, Oct 25, 2018 at 2:13 PM bupt_ljy <bu...@163.com> wrote:
>>>>>>>>
>>>>>>>>> Hi,
>>>>>>>>>
>>>>>>>>>    It seems that your codes are right. Are you sure that you’re
>>>>>>>>> using the same Serializer as the Flink program do? Could you show the
>>>>>>>>> serializer in descriptor?
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Jiayi Liao, Best
>>>>>>>>>
>>>>>>>>>  Original Message
>>>>>>>>> *Sender:* Jayant Ameta<wi...@gmail.com>
>>>>>>>>> *Recipient:* user<us...@flink.apache.org>
>>>>>>>>> *Date:* Thursday, Oct 25, 2018 14:17
>>>>>>>>> *Subject:* Queryable state when key is UUID - getting Kyro
>>>>>>>>> Exception
>>>>>>>>>
>>>>>>>>> I get Kyro exception when querying the state.
>>>>>>>>>
>>>>>>>>> Key: UUID
>>>>>>>>> MapState<UUID, String>
>>>>>>>>>
>>>>>>>>> Client code snippet:
>>>>>>>>>
>>>>>>>>> CompletableFuture<MapState<UUID, String>> resultFuture =
>>>>>>>>>     client.getKvState(JobID.fromHexString("c7b8af14b8afacf4fac16cdd0da7e997"), "rule",
>>>>>>>>>         UUID.fromString("3b3f17a0-d81a-11e8-bb91-7fd1412de84d"),
>>>>>>>>>         TypeInformation.of(new TypeHint<UUID>() {}), descriptor);
>>>>>>>>> MapState<UUID, String> mapState = resultFuture.get(10, TimeUnit.SECONDS);
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Any better way to query it?
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Jayant Ameta
>>>>>>>>>
>>>>>>>>

Re: Queryable state when key is UUID - getting Kyro Exception

Posted by Till Rohrmann <tr...@apache.org>.
Could you send us a small example program which we can use to reproduce the
problem?

Cheers,
Till

On Fri, Nov 9, 2018 at 6:57 AM Jayant Ameta <wi...@gmail.com> wrote:

> Yeah, it IS using Kryo serializer.
>
> Jayant Ameta
>
>
> On Wed, Nov 7, 2018 at 9:57 PM Till Rohrmann <tr...@apache.org> wrote:
>
>> Hi Jayant, could you check that the UUID key on the TM is actually
>> serialized using a Kryo serializer? You can do this by setting a breakpoint
>> in the constructor of the `AbstractKeyedStateBackend`.
>>
>> Cheers,
>> Till
>>
>> On Tue, Oct 30, 2018 at 9:44 AM bupt_ljy <bu...@163.com> wrote:
>>
>>> Hi, Jayant
>>>
>>>     Your code looks good to me. And I’ve tried the serialize/deserialize
>>> of Kryo on UUID class, it all looks okay.
>>>
>>>     I’m not very sure about this problem. Maybe you can write a very
>>> simple demo to try if it works.
>>>
>>>
>>> Jiayi Liao, Best
>>>
>>>  Original Message
>>> *Sender:* Jayant Ameta<wi...@gmail.com>
>>> *Recipient:* bupt_ljy<bu...@163.com>
>>> *Cc:* Tzu-Li (Gordon) Tai<tz...@apache.org>; user<
>>> user@flink.apache.org>
>>> *Date:* Monday, Oct 29, 2018 11:53
>>> *Subject:* Re: Queryable state when key is UUID - getting Kyro Exception
>>>
>>> Hi Jiayi,
>>> Any further help on this?
>>>
>>> Jayant Ameta
>>>
>>>
>>> On Fri, Oct 26, 2018 at 9:22 AM Jayant Ameta <wi...@gmail.com>
>>> wrote:
>>>
>>>> MapStateDescriptor<UUID, String> descriptor = new MapStateDescriptor<>("rulePatterns", UUID.class,
>>>>     String.class);
>>>>
>>>> Jayant Ameta
>>>>
>>>>
>>>> On Fri, Oct 26, 2018 at 8:19 AM bupt_ljy <bu...@163.com> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>>    Can you show us the descriptor in the codes below?
>>>>>
>>>>>     client.getKvState(JobID.fromHexString(
>>>>> "c7b8af14b8afacf4fac16cdd0da7e997"), "rule",
>>>>>
>>>>>         UUID.fromString("3b3f17a0-d81a-11e8-bb91-7fd1412de84d"),
>>>>>>         TypeInformation.of(new TypeHint<UUID>() {}), descriptor);
>>>>>>
>>>>>>
>>>>> Jiayi Liao, Best
>>>>>
>>>>>
>>>>>  Original Message
>>>>> *Sender:* Jayant Ameta<wi...@gmail.com>
>>>>> *Recipient:* bupt_ljy<bu...@163.com>
>>>>> *Cc:* Tzu-Li (Gordon) Tai<tz...@apache.org>; user<
>>>>> user@flink.apache.org>
>>>>> *Date:* Friday, Oct 26, 2018 02:26
>>>>> *Subject:* Re: Queryable state when key is UUID - getting Kyro
>>>>> Exception
>>>>>
>>>>> Also, I haven't provided any custom serializer in my flink job.
>>>>> Shouldn't the same configuration work for queryable state client?
>>>>>
>>>>> Jayant Ameta
>>>>>
>>>>>
>>>>> On Thu, Oct 25, 2018 at 4:15 PM Jayant Ameta <wi...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi Gordon,
>>>>>> Following is the stack trace that I'm getting:
>>>>>>
>>>>>> *Exception in thread "main" java.util.concurrent.ExecutionException:
>>>>>> java.lang.RuntimeException: Failed request 0.*
>>>>>> * Caused by: java.lang.RuntimeException: Failed request 0.*
>>>>>> * Caused by: java.lang.RuntimeException: Error while processing
>>>>>> request with ID 0. Caused by: com.esotericsoftware.kryo.KryoException:
>>>>>> Encountered unregistered class ID: -985346241*
>>>>>> *Serialization trace:*
>>>>>> *$outer (scala.collection.convert.Wrappers$SeqWrapper)*
>>>>>> * at
>>>>>> com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:119)*
>>>>>> * at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)*
>>>>>> * at
>>>>>> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:99)*
>>>>>> * at
>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)*
>>>>>> * at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)*
>>>>>> * at
>>>>>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:249)*
>>>>>> * at
>>>>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:136)*
>>>>>> * at
>>>>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)*
>>>>>> * at
>>>>>> org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer.deserializeKeyAndNamespace(KvStateSerializer.java:94)*
>>>>>> * at
>>>>>> org.apache.flink.runtime.state.heap.AbstractHeapState.getSerializedValue(AbstractHeapState.java:93)*
>>>>>> * at
>>>>>> org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:87)*
>>>>>> * at
>>>>>> org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:49)*
>>>>>> * at
>>>>>> org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:229)*
>>>>>> * at
>>>>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)*
>>>>>> * at java.util.concurrent.FutureTask.run(FutureTask.java:266)*
>>>>>> * at
>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)*
>>>>>> * at
>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)*
>>>>>> * at java.lang.Thread.run(Thread.java:748)*
>>>>>>
>>>>>> I am not using any custom serialize as mentioned by Jiayi.
>>>>>>
>>>>>> Jayant Ameta
>>>>>>
>>>>>>
>>>>>> On Thu, Oct 25, 2018 at 3:01 PM bupt_ljy <bu...@163.com> wrote:
>>>>>>
>>>>>>> Hi  Jayant,
>>>>>>>
>>>>>>>   There should be a Serializer parameter in the constructor of the
>>>>>>> StateDescriptor, you should create a new serializer like this:
>>>>>>>
>>>>>>>
>>>>>>>    new GenericTypeInfo(classOf[UUID]).createSerializer(env
>>>>>>> .getConfig)
>>>>>>>
>>>>>>>
>>>>>>>  By the way, can you show us your kryo exception like what Gordon
>>>>>>> said?
>>>>>>>
>>>>>>>
>>>>>>> Jiayi Liao, Best
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>  Original Message
>>>>>>> *Sender:* Tzu-Li (Gordon) Tai<tz...@apache.org>
>>>>>>> *Recipient:* Jayant Ameta<wi...@gmail.com>; bupt_ljy<
>>>>>>> bupt_ljy@163.com>
>>>>>>> *Cc:* user<us...@flink.apache.org>
>>>>>>> *Date:* Thursday, Oct 25, 2018 17:18
>>>>>>> *Subject:* Re: Queryable state when key is UUID - getting Kyro
>>>>>>> Exception
>>>>>>>
>>>>>>> Hi Jayant,
>>>>>>>
>>>>>>> What is the Kryo exception message that you are getting?
>>>>>>>
>>>>>>> Cheers,
>>>>>>> Gordon
>>>>>>>
>>>>>>>
>>>>>>> On 25 October 2018 at 5:17:13 PM, Jayant Ameta (wittyameta@gmail.com)
>>>>>>> wrote:
>>>>>>>
>>>>>>> Hi,
>>>>>>> I've not configured any serializer in the descriptor. (Neither in
>>>>>>> flink job, nor in state query client).
>>>>>>> Which serializer should I use?
>>>>>>>
>>>>>>> Jayant Ameta
>>>>>>>
>>>>>>>
>>>>>>> On Thu, Oct 25, 2018 at 2:13 PM bupt_ljy <bu...@163.com> wrote:
>>>>>>>
>>>>>>>> Hi,
>>>>>>>>
>>>>>>>>    It seems that your codes are right. Are you sure that you’re
>>>>>>>> using the same Serializer as the Flink program do? Could you show the
>>>>>>>> serializer in descriptor?
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> Jiayi Liao, Best
>>>>>>>>
>>>>>>>>  Original Message
>>>>>>>> *Sender:* Jayant Ameta<wi...@gmail.com>
>>>>>>>> *Recipient:* user<us...@flink.apache.org>
>>>>>>>> *Date:* Thursday, Oct 25, 2018 14:17
>>>>>>>> *Subject:* Queryable state when key is UUID - getting Kyro
>>>>>>>> Exception
>>>>>>>>
>>>>>>>> I get Kyro exception when querying the state.
>>>>>>>>
>>>>>>>> Key: UUID
>>>>>>>> MapState<UUID, String>
>>>>>>>>
>>>>>>>> Client code snippet:
>>>>>>>>
>>>>>>>> CompletableFuture<MapState<UUID, String>> resultFuture =
>>>>>>>>     client.getKvState(JobID.fromHexString("c7b8af14b8afacf4fac16cdd0da7e997"), "rule",
>>>>>>>>         UUID.fromString("3b3f17a0-d81a-11e8-bb91-7fd1412de84d"),
>>>>>>>>         TypeInformation.of(new TypeHint<UUID>() {}), descriptor);
>>>>>>>> MapState<UUID, String> mapState = resultFuture.get(10, TimeUnit.SECONDS);
>>>>>>>>
>>>>>>>>
>>>>>>>> Any better way to query it?
>>>>>>>>
>>>>>>>>
>>>>>>>> Jayant Ameta
>>>>>>>>
>>>>>>>

Re: Queryable state when key is UUID - getting Kyro Exception

Posted by Jayant Ameta <wi...@gmail.com>.
Yeah, it IS using Kryo serializer.

Jayant Ameta


On Wed, Nov 7, 2018 at 9:57 PM Till Rohrmann <tr...@apache.org> wrote:

> Hi Jayant, could you check that the UUID key on the TM is actually
> serialized using a Kryo serializer? You can do this by setting a breakpoint
> in the constructor of the `AbstractKeyedStateBackend`.
>
> Cheers,
> Till
>
> On Tue, Oct 30, 2018 at 9:44 AM bupt_ljy <bu...@163.com> wrote:
>
>> Hi, Jayant
>>
>>     Your code looks good to me. And I’ve tried the serialize/deserialize
>> of Kryo on UUID class, it all looks okay.
>>
>>     I’m not very sure about this problem. Maybe you can write a very
>> simple demo to try if it works.
>>
>>
>> Jiayi Liao, Best
>>
>>  Original Message
>> *Sender:* Jayant Ameta<wi...@gmail.com>
>> *Recipient:* bupt_ljy<bu...@163.com>
>> *Cc:* Tzu-Li (Gordon) Tai<tz...@apache.org>; user<
>> user@flink.apache.org>
>> *Date:* Monday, Oct 29, 2018 11:53
>> *Subject:* Re: Queryable state when key is UUID - getting Kyro Exception
>>
>> Hi Jiayi,
>> Any further help on this?
>>
>> Jayant Ameta
>>
>>
>> On Fri, Oct 26, 2018 at 9:22 AM Jayant Ameta <wi...@gmail.com>
>> wrote:
>>
>>> MapStateDescriptor<UUID, String> descriptor = new MapStateDescriptor<>("rulePatterns", UUID.class,
>>>     String.class);
>>>
>>> Jayant Ameta
>>>
>>>
>>> On Fri, Oct 26, 2018 at 8:19 AM bupt_ljy <bu...@163.com> wrote:
>>>
>>>> Hi,
>>>>
>>>>    Can you show us the descriptor in the codes below?
>>>>
>>>>     client.getKvState(JobID.fromHexString(
>>>> "c7b8af14b8afacf4fac16cdd0da7e997"), "rule",
>>>>
>>>>         UUID.fromString("3b3f17a0-d81a-11e8-bb91-7fd1412de84d"),
>>>>>         TypeInformation.of(new TypeHint<UUID>() {}), descriptor);
>>>>>
>>>>>
>>>> Jiayi Liao, Best
>>>>
>>>>
>>>>  Original Message
>>>> *Sender:* Jayant Ameta<wi...@gmail.com>
>>>> *Recipient:* bupt_ljy<bu...@163.com>
>>>> *Cc:* Tzu-Li (Gordon) Tai<tz...@apache.org>; user<
>>>> user@flink.apache.org>
>>>> *Date:* Friday, Oct 26, 2018 02:26
>>>> *Subject:* Re: Queryable state when key is UUID - getting Kyro
>>>> Exception
>>>>
>>>> Also, I haven't provided any custom serializer in my flink job.
>>>> Shouldn't the same configuration work for queryable state client?
>>>>
>>>> Jayant Ameta
>>>>
>>>>
>>>> On Thu, Oct 25, 2018 at 4:15 PM Jayant Ameta <wi...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi Gordon,
>>>>> Following is the stack trace that I'm getting:
>>>>>
>>>>> *Exception in thread "main" java.util.concurrent.ExecutionException:
>>>>> java.lang.RuntimeException: Failed request 0.*
>>>>> * Caused by: java.lang.RuntimeException: Failed request 0.*
>>>>> * Caused by: java.lang.RuntimeException: Error while processing
>>>>> request with ID 0. Caused by: com.esotericsoftware.kryo.KryoException:
>>>>> Encountered unregistered class ID: -985346241*
>>>>> *Serialization trace:*
>>>>> *$outer (scala.collection.convert.Wrappers$SeqWrapper)*
>>>>> * at
>>>>> com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:119)*
>>>>> * at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)*
>>>>> * at
>>>>> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:99)*
>>>>> * at
>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)*
>>>>> * at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)*
>>>>> * at
>>>>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:249)*
>>>>> * at
>>>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:136)*
>>>>> * at
>>>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)*
>>>>> * at
>>>>> org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer.deserializeKeyAndNamespace(KvStateSerializer.java:94)*
>>>>> * at
>>>>> org.apache.flink.runtime.state.heap.AbstractHeapState.getSerializedValue(AbstractHeapState.java:93)*
>>>>> * at
>>>>> org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:87)*
>>>>> * at
>>>>> org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:49)*
>>>>> * at
>>>>> org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:229)*
>>>>> * at
>>>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)*
>>>>> * at java.util.concurrent.FutureTask.run(FutureTask.java:266)*
>>>>> * at
>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)*
>>>>> * at
>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)*
>>>>> * at java.lang.Thread.run(Thread.java:748)*
>>>>>
>>>>> I am not using any custom serialize as mentioned by Jiayi.
>>>>>
>>>>> Jayant Ameta
>>>>>
>>>>>
>>>>> On Thu, Oct 25, 2018 at 3:01 PM bupt_ljy <bu...@163.com> wrote:
>>>>>
>>>>>> Hi  Jayant,
>>>>>>
>>>>>>   There should be a Serializer parameter in the constructor of the
>>>>>> StateDescriptor, you should create a new serializer like this:
>>>>>>
>>>>>>
>>>>>>    new GenericTypeInfo(classOf[UUID]).createSerializer(env.getConfig)
>>>>>>
>>>>>>
>>>>>>  By the way, can you show us your kryo exception like what Gordon
>>>>>> said?
>>>>>>
>>>>>>
>>>>>> Jiayi Liao, Best
>>>>>>
>>>>>>
>>>>>>
>>>>>>  Original Message
>>>>>> *Sender:* Tzu-Li (Gordon) Tai<tz...@apache.org>
>>>>>> *Recipient:* Jayant Ameta<wi...@gmail.com>; bupt_ljy<
>>>>>> bupt_ljy@163.com>
>>>>>> *Cc:* user<us...@flink.apache.org>
>>>>>> *Date:* Thursday, Oct 25, 2018 17:18
>>>>>> *Subject:* Re: Queryable state when key is UUID - getting Kyro
>>>>>> Exception
>>>>>>
>>>>>> Hi Jayant,
>>>>>>
>>>>>> What is the Kryo exception message that you are getting?
>>>>>>
>>>>>> Cheers,
>>>>>> Gordon
>>>>>>
>>>>>>
>>>>>> On 25 October 2018 at 5:17:13 PM, Jayant Ameta (wittyameta@gmail.com)
>>>>>> wrote:
>>>>>>
>>>>>> Hi,
>>>>>> I've not configured any serializer in the descriptor. (Neither in
>>>>>> flink job, nor in state query client).
>>>>>> Which serializer should I use?
>>>>>>
>>>>>> Jayant Ameta
>>>>>>
>>>>>>
>>>>>> On Thu, Oct 25, 2018 at 2:13 PM bupt_ljy <bu...@163.com> wrote:
>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>>    It seems that your codes are right. Are you sure that you’re
>>>>>>> using the same Serializer as the Flink program do? Could you show the
>>>>>>> serializer in descriptor?
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> Jiayi Liao, Best
>>>>>>>
>>>>>>>  Original Message
>>>>>>> *Sender:* Jayant Ameta<wi...@gmail.com>
>>>>>>> *Recipient:* user<us...@flink.apache.org>
>>>>>>> *Date:* Thursday, Oct 25, 2018 14:17
>>>>>>> *Subject:* Queryable state when key is UUID - getting Kyro Exception
>>>>>>>
>>>>>>> I get Kyro exception when querying the state.
>>>>>>>
>>>>>>> Key: UUID
>>>>>>> MapState<UUID, String>
>>>>>>>
>>>>>>> Client code snippet:
>>>>>>>
>>>>>>> CompletableFuture<MapState<UUID, String>> resultFuture =
>>>>>>>     client.getKvState(JobID.fromHexString("c7b8af14b8afacf4fac16cdd0da7e997"), "rule",
>>>>>>>         UUID.fromString("3b3f17a0-d81a-11e8-bb91-7fd1412de84d"),
>>>>>>>         TypeInformation.of(new TypeHint<UUID>() {}), descriptor);
>>>>>>> MapState<UUID, String> mapState = resultFuture.get(10, TimeUnit.SECONDS);
>>>>>>>
>>>>>>>
>>>>>>> Any better way to query it?
>>>>>>>
>>>>>>>
>>>>>>> Jayant Ameta
>>>>>>>
>>>>>>