You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Velu Mitwa <ve...@gmail.com> on 2018/01/22 13:38:40 UTC

Unable to query MapState

Hi,
I am trying to query Flink's MapState from Flink client (1.3.2). I was able
to query ValueState but when I tried to query MapState I am getting an
exception.

java.io.IOException: Unconsumed bytes in the deserialized value. This
indicates a mismatch in the value serializers used by the KvState instance
and this access.
        at
org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer.deserializeValue(KvStateRequestSerializer.java:438)
        at
com.paysafe.ss.flink.client.service.impl.QueryStateServiceImpl.getKeyValue(QueryStateServiceImpl.java:81)
        at
com.paysafe.ss.flink.client.web.rest.controller.QueryStateController.getStateValue(QueryStateController.java:49)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at
org.springframework.web.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:205)
        at
org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:133)
        at
org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:97)
        at
org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.invokeHandlerMethod(RequestMappingHandlerAdapter.java:827)
        at
org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.handleInternal(RequestMappingHandlerAdapter.java:738)

*Flink Job's Logic*

   * FlinkKafkaConsumer09<MerchantApiEvent> consumer = new
FlinkKafkaConsumer09<>(*
*        "/apps/application-stream:flink-demo", new MerchantApiSchema(),
properties);*

*    DataStream<MerchantApiEvent> inputEventStream =
env.addSource(consumer);*

*    DataStream<Tuple3<String, String, Long>> outputStream =*
*        inputEventStream.map(new CreateTuple()).keyBy(0).keyBy(1)*
*            .window(SlidingProcessingTimeWindows.of(Time.seconds(120),
Time.milliseconds(1000)))*
*            .sum(2);*

*    DataStream<Long> output = outputStream.keyBy(0).flatMap(new
CountEvent());*

*    output.writeAsText("/tmp/flink_out", FileSystem.WriteMode.OVERWRITE);*

*    // execute program*
*    env.execute("Filter Transformation Example");*

*  }*


*  public static class CreateTuple*
*      implements MapFunction<MerchantApiEvent, Tuple3<String, String,
Long>> {*
*    @Override*
*    public Tuple3<String, String, Long> map(MerchantApiEvent input) throws
Exception {*
*      return new Tuple3<String, String, Long>(input.getMerchantId(),
input.getApiName(), 1L);*
*    }*

*  }*

*  public static class CountEvent extends
RichFlatMapFunction<Tuple3<String, String, Long>, Long> {*

*    private transient MapState<String, Long> mapState;*

*    @Override*
*    public void flatMap(Tuple3<String, String, Long> input,
Collector<Long> out) throws Exception {*

*      mapState.put(input.f1, input.f2);*

*    }*

*    @Override*
*    public void open(Configuration config) {*

*      MapStateDescriptor<String, Long> mapStateDesc = new
MapStateDescriptor<String, Long>(*
*          "mapQuery", TypeInformation.of(new TypeHint<String>() {*
*          }), TypeInformation.of(new TypeHint<Long>() {*
*          }));*
*      mapStateDesc.setQueryable("mapQuery");*

*      mapState = getRuntimeContext().getMapState(mapStateDesc);*

*    }*
*  }*


*Flink Query Client's Logic*

    *final JobID jobId = JobID.fromHexString(jobIdParam);*

*    String key = queryStateRequestDto.getKey();*

*    final Configuration config = new Configuration();*
*    config.setString(JobManagerOptions.ADDRESS, jobManagerHost);*
*    config.setInteger(JobManagerOptions.PORT, jobManagerPort);*

*    HighAvailabilityServices highAvailabilityServices = null;*
*    try {*
*      highAvailabilityServices =
HighAvailabilityServicesUtils.createHighAvailabilityServices(*
*          config, Executors.newSingleThreadScheduledExecutor(),*
*
HighAvailabilityServicesUtils.AddressResolution.TRY_ADDRESS_RESOLUTION);*
*    } catch (Exception e) {*
*      // TODO Auto-generated catch block*
*      e.printStackTrace();*
*    }*

*    try {*
*      QueryableStateClient client = new QueryableStateClient(config,
highAvailabilityServices);*

*      final TypeSerializer<String> keySerializer = TypeInformation.of(new
TypeHint<String>() {*
*      }).createSerializer(new ExecutionConfig());*
*      final TypeSerializer<Map<String, Long>> valueSerializer =*
*          TypeInformation.of(new TypeHint<Map<String, Long>>() {*
*          }).createSerializer(new ExecutionConfig());*

*      final byte[] serializedKey =
KvStateRequestSerializer.serializeKeyAndNamespace(key,*
*          keySerializer, VoidNamespace.INSTANCE,
VoidNamespaceSerializer.INSTANCE);*

*      scala.concurrent.Future<byte[]> serializedResult =*
*          client.getKvState(jobId, "mapQuery", key.hashCode(),
serializedKey);*

*      // now wait for the result and return it*
*      final FiniteDuration duration = new FiniteDuration(1,
TimeUnit.SECONDS);*
*      byte[] serializedValue = Await.result(serializedResult, duration);*
*      Map<String, Long> value =*
*          KvStateRequestSerializer.deserializeValue(serializedValue,
valueSerializer);*
*      System.out.println(value);*
*    } catch (Exception e) {*
*      // TODO Auto-generated catch block*
*      e.printStackTrace();*
*    }*

Re: Unable to query MapState

Posted by Kostas Kloudas <k....@data-artisans.com>.
Hi Velu,

I would recommend to switch to Flink 1.4 as the queryable state has been refactored to be compatible with all types of state.
You can read more here: https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/state/queryable_state.html <https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/state/queryable_state.html>
In addition, a lot of things have been simplified.

And for an example you can use this link: 
https://github.com/apache/flink/blob/a3fd548e9c76c67609bbf159d5fb743d756450b1/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java#L804

which is directly from the Queryable State IT cases.

Thanks,
Kostas

> On Jan 22, 2018, at 2:38 PM, Velu Mitwa <ve...@gmail.com> wrote:
> 
> Hi,
> I am trying to query Flink's MapState from Flink client (1.3.2). I was able to query ValueState but when I tried to query MapState I am getting an exception. 
> 
> java.io.IOException: Unconsumed bytes in the deserialized value. This indicates a mismatch in the value serializers used by the KvState instance and this access.
>         at org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer.deserializeValue(KvStateRequestSerializer.java:438)
>         at com.paysafe.ss.flink.client.service.impl.QueryStateServiceImpl.getKeyValue(QueryStateServiceImpl.java:81)
>         at com.paysafe.ss.flink.client.web.rest.controller.QueryStateController.getStateValue(QueryStateController.java:49)
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>         at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>         at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:498)
>         at org.springframework.web.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:205)
>         at org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:133)
>         at org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:97)
>         at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.invokeHandlerMethod(RequestMappingHandlerAdapter.java:827)
>         at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.handleInternal(RequestMappingHandlerAdapter.java:738)
> 
> Flink Job's Logic
> 
>     FlinkKafkaConsumer09<MerchantApiEvent> consumer = new FlinkKafkaConsumer09<>(
>         "/apps/application-stream:flink-demo", new MerchantApiSchema(), properties);
> 
>     DataStream<MerchantApiEvent> inputEventStream = env.addSource(consumer);
> 
>     DataStream<Tuple3<String, String, Long>> outputStream =
>         inputEventStream.map(new CreateTuple()).keyBy(0).keyBy(1)
>             .window(SlidingProcessingTimeWindows.of(Time.seconds(120), Time.milliseconds(1000)))
>             .sum(2);
> 
>     DataStream<Long> output = outputStream.keyBy(0).flatMap(new CountEvent());
> 
>     output.writeAsText("/tmp/flink_out", FileSystem.WriteMode.OVERWRITE);
> 
>     // execute program
>     env.execute("Filter Transformation Example");
> 
>   }
> 
> 
>   public static class CreateTuple
>       implements MapFunction<MerchantApiEvent, Tuple3<String, String, Long>> {
>     @Override
>     public Tuple3<String, String, Long> map(MerchantApiEvent input) throws Exception {
>       return new Tuple3<String, String, Long>(input.getMerchantId(), input.getApiName(), 1L);
>     }
> 
>   }
> 
>   public static class CountEvent extends RichFlatMapFunction<Tuple3<String, String, Long>, Long> {
> 
>     private transient MapState<String, Long> mapState;
> 
>     @Override
>     public void flatMap(Tuple3<String, String, Long> input, Collector<Long> out) throws Exception {
> 
>       mapState.put(input.f1, input.f2);
> 
>     }
> 
>     @Override
>     public void open(Configuration config) {
> 
>       MapStateDescriptor<String, Long> mapStateDesc = new MapStateDescriptor<String, Long>(
>           "mapQuery", TypeInformation.of(new TypeHint<String>() {
>           }), TypeInformation.of(new TypeHint<Long>() {
>           }));
>       mapStateDesc.setQueryable("mapQuery");
> 
>       mapState = getRuntimeContext().getMapState(mapStateDesc);
> 
>     }
>   }
> 
> 
> Flink Query Client's Logic
> 
>     final JobID jobId = JobID.fromHexString(jobIdParam);
> 
>     String key = queryStateRequestDto.getKey();
> 
>     final Configuration config = new Configuration();
>     config.setString(JobManagerOptions.ADDRESS, jobManagerHost);
>     config.setInteger(JobManagerOptions.PORT, jobManagerPort);
> 
>     HighAvailabilityServices highAvailabilityServices = null;
>     try {
>       highAvailabilityServices = HighAvailabilityServicesUtils.createHighAvailabilityServices(
>           config, Executors.newSingleThreadScheduledExecutor(),
>           HighAvailabilityServicesUtils.AddressResolution.TRY_ADDRESS_RESOLUTION);
>     } catch (Exception e) {
>       // TODO Auto-generated catch block
>       e.printStackTrace();
>     }
> 
>     try {
>       QueryableStateClient client = new QueryableStateClient(config, highAvailabilityServices);
> 
>       final TypeSerializer<String> keySerializer = TypeInformation.of(new TypeHint<String>() {
>       }).createSerializer(new ExecutionConfig());
>       final TypeSerializer<Map<String, Long>> valueSerializer =
>           TypeInformation.of(new TypeHint<Map<String, Long>>() {
>           }).createSerializer(new ExecutionConfig());
> 
>       final byte[] serializedKey = KvStateRequestSerializer.serializeKeyAndNamespace(key,
>           keySerializer, VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE);
> 
>       scala.concurrent.Future<byte[]> serializedResult =
>           client.getKvState(jobId, "mapQuery", key.hashCode(), serializedKey);
> 
>       // now wait for the result and return it
>       final FiniteDuration duration = new FiniteDuration(1, TimeUnit.SECONDS);
>       byte[] serializedValue = Await.result(serializedResult, duration);
>       Map<String, Long> value =
>           KvStateRequestSerializer.deserializeValue(serializedValue, valueSerializer);
>       System.out.println(value);
>     } catch (Exception e) {
>       // TODO Auto-generated catch block
>       e.printStackTrace();
>     }