You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Velu Mitwa <ve...@gmail.com> on 2018/01/22 13:38:40 UTC
Unable to query MapState
Hi,
I am trying to query Flink's MapState from Flink client (1.3.2). I was able
to query ValueState but when I tried to query MapState I am getting an
exception.
java.io.IOException: Unconsumed bytes in the deserialized value. This
indicates a mismatch in the value serializers used by the KvState instance
and this access.
at
org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer.deserializeValue(KvStateRequestSerializer.java:438)
at
com.paysafe.ss.flink.client.service.impl.QueryStateServiceImpl.getKeyValue(QueryStateServiceImpl.java:81)
at
com.paysafe.ss.flink.client.web.rest.controller.QueryStateController.getStateValue(QueryStateController.java:49)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.springframework.web.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:205)
at
org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:133)
at
org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:97)
at
org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.invokeHandlerMethod(RequestMappingHandlerAdapter.java:827)
at
org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.handleInternal(RequestMappingHandlerAdapter.java:738)
*Flink Job's Logic*
* FlinkKafkaConsumer09<MerchantApiEvent> consumer = new
FlinkKafkaConsumer09<>(*
* "/apps/application-stream:flink-demo", new MerchantApiSchema(),
properties);*
* DataStream<MerchantApiEvent> inputEventStream =
env.addSource(consumer);*
* DataStream<Tuple3<String, String, Long>> outputStream =*
* inputEventStream.map(new CreateTuple()).keyBy(0).keyBy(1)*
* .window(SlidingProcessingTimeWindows.of(Time.seconds(120),
Time.milliseconds(1000)))*
* .sum(2);*
* DataStream<Long> output = outputStream.keyBy(0).flatMap(new
CountEvent());*
* output.writeAsText("/tmp/flink_out", FileSystem.WriteMode.OVERWRITE);*
* // execute program*
* env.execute("Filter Transformation Example");*
* }*
* public static class CreateTuple*
* implements MapFunction<MerchantApiEvent, Tuple3<String, String,
Long>> {*
* @Override*
* public Tuple3<String, String, Long> map(MerchantApiEvent input) throws
Exception {*
* return new Tuple3<String, String, Long>(input.getMerchantId(),
input.getApiName(), 1L);*
* }*
* }*
* public static class CountEvent extends
RichFlatMapFunction<Tuple3<String, String, Long>, Long> {*
* private transient MapState<String, Long> mapState;*
* @Override*
* public void flatMap(Tuple3<String, String, Long> input,
Collector<Long> out) throws Exception {*
* mapState.put(input.f1, input.f2);*
* }*
* @Override*
* public void open(Configuration config) {*
* MapStateDescriptor<String, Long> mapStateDesc = new
MapStateDescriptor<String, Long>(*
* "mapQuery", TypeInformation.of(new TypeHint<String>() {*
* }), TypeInformation.of(new TypeHint<Long>() {*
* }));*
* mapStateDesc.setQueryable("mapQuery");*
* mapState = getRuntimeContext().getMapState(mapStateDesc);*
* }*
* }*
*Flink Query Client's Logic*
*final JobID jobId = JobID.fromHexString(jobIdParam);*
* String key = queryStateRequestDto.getKey();*
* final Configuration config = new Configuration();*
* config.setString(JobManagerOptions.ADDRESS, jobManagerHost);*
* config.setInteger(JobManagerOptions.PORT, jobManagerPort);*
* HighAvailabilityServices highAvailabilityServices = null;*
* try {*
* highAvailabilityServices =
HighAvailabilityServicesUtils.createHighAvailabilityServices(*
* config, Executors.newSingleThreadScheduledExecutor(),*
*
HighAvailabilityServicesUtils.AddressResolution.TRY_ADDRESS_RESOLUTION);*
* } catch (Exception e) {*
* // TODO Auto-generated catch block*
* e.printStackTrace();*
* }*
* try {*
* QueryableStateClient client = new QueryableStateClient(config,
highAvailabilityServices);*
* final TypeSerializer<String> keySerializer = TypeInformation.of(new
TypeHint<String>() {*
* }).createSerializer(new ExecutionConfig());*
* final TypeSerializer<Map<String, Long>> valueSerializer =*
* TypeInformation.of(new TypeHint<Map<String, Long>>() {*
* }).createSerializer(new ExecutionConfig());*
* final byte[] serializedKey =
KvStateRequestSerializer.serializeKeyAndNamespace(key,*
* keySerializer, VoidNamespace.INSTANCE,
VoidNamespaceSerializer.INSTANCE);*
* scala.concurrent.Future<byte[]> serializedResult =*
* client.getKvState(jobId, "mapQuery", key.hashCode(),
serializedKey);*
* // now wait for the result and return it*
* final FiniteDuration duration = new FiniteDuration(1,
TimeUnit.SECONDS);*
* byte[] serializedValue = Await.result(serializedResult, duration);*
* Map<String, Long> value =*
* KvStateRequestSerializer.deserializeValue(serializedValue,
valueSerializer);*
* System.out.println(value);*
* } catch (Exception e) {*
* // TODO Auto-generated catch block*
* e.printStackTrace();*
* }*
Re: Unable to query MapState
Posted by Kostas Kloudas <k....@data-artisans.com>.
Hi Velu,
I would recommend to switch to Flink 1.4 as the queryable state has been refactored to be compatible with all types of state.
You can read more here: https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/state/queryable_state.html <https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/state/queryable_state.html>
In addition, a lot of things have been simplified.
And for an example you can use this link:
https://github.com/apache/flink/blob/a3fd548e9c76c67609bbf159d5fb743d756450b1/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java#L804
which is directly from the Queryable State IT cases.
Thanks,
Kostas
> On Jan 22, 2018, at 2:38 PM, Velu Mitwa <ve...@gmail.com> wrote:
>
> Hi,
> I am trying to query Flink's MapState from Flink client (1.3.2). I was able to query ValueState but when I tried to query MapState I am getting an exception.
>
> java.io.IOException: Unconsumed bytes in the deserialized value. This indicates a mismatch in the value serializers used by the KvState instance and this access.
> at org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer.deserializeValue(KvStateRequestSerializer.java:438)
> at com.paysafe.ss.flink.client.service.impl.QueryStateServiceImpl.getKeyValue(QueryStateServiceImpl.java:81)
> at com.paysafe.ss.flink.client.web.rest.controller.QueryStateController.getStateValue(QueryStateController.java:49)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at org.springframework.web.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:205)
> at org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:133)
> at org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:97)
> at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.invokeHandlerMethod(RequestMappingHandlerAdapter.java:827)
> at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.handleInternal(RequestMappingHandlerAdapter.java:738)
>
> Flink Job's Logic
>
> FlinkKafkaConsumer09<MerchantApiEvent> consumer = new FlinkKafkaConsumer09<>(
> "/apps/application-stream:flink-demo", new MerchantApiSchema(), properties);
>
> DataStream<MerchantApiEvent> inputEventStream = env.addSource(consumer);
>
> DataStream<Tuple3<String, String, Long>> outputStream =
> inputEventStream.map(new CreateTuple()).keyBy(0).keyBy(1)
> .window(SlidingProcessingTimeWindows.of(Time.seconds(120), Time.milliseconds(1000)))
> .sum(2);
>
> DataStream<Long> output = outputStream.keyBy(0).flatMap(new CountEvent());
>
> output.writeAsText("/tmp/flink_out", FileSystem.WriteMode.OVERWRITE);
>
> // execute program
> env.execute("Filter Transformation Example");
>
> }
>
>
> public static class CreateTuple
> implements MapFunction<MerchantApiEvent, Tuple3<String, String, Long>> {
> @Override
> public Tuple3<String, String, Long> map(MerchantApiEvent input) throws Exception {
> return new Tuple3<String, String, Long>(input.getMerchantId(), input.getApiName(), 1L);
> }
>
> }
>
> public static class CountEvent extends RichFlatMapFunction<Tuple3<String, String, Long>, Long> {
>
> private transient MapState<String, Long> mapState;
>
> @Override
> public void flatMap(Tuple3<String, String, Long> input, Collector<Long> out) throws Exception {
>
> mapState.put(input.f1, input.f2);
>
> }
>
> @Override
> public void open(Configuration config) {
>
> MapStateDescriptor<String, Long> mapStateDesc = new MapStateDescriptor<String, Long>(
> "mapQuery", TypeInformation.of(new TypeHint<String>() {
> }), TypeInformation.of(new TypeHint<Long>() {
> }));
> mapStateDesc.setQueryable("mapQuery");
>
> mapState = getRuntimeContext().getMapState(mapStateDesc);
>
> }
> }
>
>
> Flink Query Client's Logic
>
> final JobID jobId = JobID.fromHexString(jobIdParam);
>
> String key = queryStateRequestDto.getKey();
>
> final Configuration config = new Configuration();
> config.setString(JobManagerOptions.ADDRESS, jobManagerHost);
> config.setInteger(JobManagerOptions.PORT, jobManagerPort);
>
> HighAvailabilityServices highAvailabilityServices = null;
> try {
> highAvailabilityServices = HighAvailabilityServicesUtils.createHighAvailabilityServices(
> config, Executors.newSingleThreadScheduledExecutor(),
> HighAvailabilityServicesUtils.AddressResolution.TRY_ADDRESS_RESOLUTION);
> } catch (Exception e) {
> // TODO Auto-generated catch block
> e.printStackTrace();
> }
>
> try {
> QueryableStateClient client = new QueryableStateClient(config, highAvailabilityServices);
>
> final TypeSerializer<String> keySerializer = TypeInformation.of(new TypeHint<String>() {
> }).createSerializer(new ExecutionConfig());
> final TypeSerializer<Map<String, Long>> valueSerializer =
> TypeInformation.of(new TypeHint<Map<String, Long>>() {
> }).createSerializer(new ExecutionConfig());
>
> final byte[] serializedKey = KvStateRequestSerializer.serializeKeyAndNamespace(key,
> keySerializer, VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE);
>
> scala.concurrent.Future<byte[]> serializedResult =
> client.getKvState(jobId, "mapQuery", key.hashCode(), serializedKey);
>
> // now wait for the result and return it
> final FiniteDuration duration = new FiniteDuration(1, TimeUnit.SECONDS);
> byte[] serializedValue = Await.result(serializedResult, duration);
> Map<String, Long> value =
> KvStateRequestSerializer.deserializeValue(serializedValue, valueSerializer);
> System.out.println(value);
> } catch (Exception e) {
> // TODO Auto-generated catch block
> e.printStackTrace();
> }