You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Alexis Sarda-Espinosa <al...@microfocus.com> on 2022/04/08 07:58:42 UTC
Using state processor API to read state defined with a TypeHint
Hi everyone,
I have a ProcessWindowFunction that uses Global window state. It uses MapState with a descriptor defined like this:
MapStateDescriptor<Long, List<String>> msd = new MapStateDescriptor<>(
"descriptorName",
TypeInformation.of(Long.class),
TypeInformation.of(new TypeHint<List<String>>() {})
);
Now I'm trying to access a checkpoint's state data to read that (created with RocksDB as backend in Flink 1.14.4). I have a WindowReaderFunction<MyPojo, Integer, String, TimeWindow> that defines the same descriptor and calls this in readWindow:
MapState<Long, List<String>> mapState = context.globalState().getMapState(msd);
After loading the savepoint with EmbeddedRocksDBStateBackend(true), I try to configure the reader function like this:
savepoint
.window(SlidingEventTimeWindows.of(Time.minutes(11L), Time.minutes(1L)))
.process(
"my-uid",
new StateReaderFunction(),
Types.STRING,
TypeInformation.of(MyPojo.class),
Types.INT
)
.print();
But I am getting this exception:
Caused by: org.apache.flink.util.StateMigrationException: The new state serializer (org.apache.flink.api.common.typeutils.base.MapSerializer@a07c9263) must not be incompatible with the old state serializer (org.apache.flink.api.common.typeutils.base.MapSerializer@706b3103).
Does someone know what I'm doing wrong in my setup?
Regards,
Alexis.
RE: Using state processor API to read state defined with a TypeHint
Posted by Alexis Sarda-Espinosa <al...@microfocus.com>.
Hi Roman,
Thanks for the quick response. It wasn't that, but your comment about erasure made me realize I should have debugged the code and looked at the types. Apparently setting TTL changes the serializer, so I also had to add TTL in the WindowReaderFunction.
Regards,
Alexis.
-----Original Message-----
From: Roman Khachatryan <ro...@apache.org>
Sent: Freitag, 8. April 2022 11:48
To: Alexis Sarda-Espinosa <al...@microfocus.com>
Cc: user@flink.apache.org
Subject: Re: Using state processor API to read state defined with a TypeHint
Hi Alexis,
I think your setup is fine, but probably Java type erasure makes Flink consider the two serializers as different.
Could you try creating a MapStateDescriptor by explicitly providing serializers (constructed manually)?
Regards,
Roman
On Fri, Apr 8, 2022 at 10:01 AM Alexis Sarda-Espinosa <al...@microfocus.com> wrote:
>
> Hi everyone,
>
>
>
> I have a ProcessWindowFunction that uses Global window state. It uses MapState with a descriptor defined like this:
>
>
>
> MapStateDescriptor<Long, List<String>> msd = new MapStateDescriptor<>(
>
> "descriptorName",
>
> TypeInformation.of(Long.class),
>
> TypeInformation.of(new TypeHint<List<String>>() {})
>
> );
>
>
>
> Now I’m trying to access a checkpoint’s state data to read that (created with RocksDB as backend in Flink 1.14.4). I have a WindowReaderFunction<MyPojo, Integer, String, TimeWindow> that defines the same descriptor and calls this in readWindow:
>
>
>
> MapState<Long, List<String>> mapState =
> context.globalState().getMapState(msd);
>
>
>
> After loading the savepoint with EmbeddedRocksDBStateBackend(true), I try to configure the reader function like this:
>
>
>
> savepoint
>
> .window(SlidingEventTimeWindows.of(Time.minutes(11L),
> Time.minutes(1L)))
>
> .process(
>
> "my-uid",
>
> new StateReaderFunction(),
>
> Types.STRING,
>
> TypeInformation.of(MyPojo.class),
>
> Types.INT
>
> )
>
> .print();
>
>
>
> But I am getting this exception:
>
>
>
> Caused by: org.apache.flink.util.StateMigrationException: The new state serializer (org.apache.flink.api.common.typeutils.base.MapSerializer@a07c9263) must not be incompatible with the old state serializer (org.apache.flink.api.common.typeutils.base.MapSerializer@706b3103).
>
>
>
> Does someone know what I’m doing wrong in my setup?
>
>
>
> Regards,
>
> Alexis.
>
>
Re: Using state processor API to read state defined with a TypeHint
Posted by Roman Khachatryan <ro...@apache.org>.
Hi Alexis,
I think your setup is fine, but probably Java type erasure makes Flink
consider the two serializers as different.
Could you try creating a MapStateDescriptor by explicitly providing
serializers (constructed manually)?
Regards,
Roman
On Fri, Apr 8, 2022 at 10:01 AM Alexis Sarda-Espinosa
<al...@microfocus.com> wrote:
>
> Hi everyone,
>
>
>
> I have a ProcessWindowFunction that uses Global window state. It uses MapState with a descriptor defined like this:
>
>
>
> MapStateDescriptor<Long, List<String>> msd = new MapStateDescriptor<>(
>
> "descriptorName",
>
> TypeInformation.of(Long.class),
>
> TypeInformation.of(new TypeHint<List<String>>() {})
>
> );
>
>
>
> Now I’m trying to access a checkpoint’s state data to read that (created with RocksDB as backend in Flink 1.14.4). I have a WindowReaderFunction<MyPojo, Integer, String, TimeWindow> that defines the same descriptor and calls this in readWindow:
>
>
>
> MapState<Long, List<String>> mapState = context.globalState().getMapState(msd);
>
>
>
> After loading the savepoint with EmbeddedRocksDBStateBackend(true), I try to configure the reader function like this:
>
>
>
> savepoint
>
> .window(SlidingEventTimeWindows.of(Time.minutes(11L), Time.minutes(1L)))
>
> .process(
>
> "my-uid",
>
> new StateReaderFunction(),
>
> Types.STRING,
>
> TypeInformation.of(MyPojo.class),
>
> Types.INT
>
> )
>
> .print();
>
>
>
> But I am getting this exception:
>
>
>
> Caused by: org.apache.flink.util.StateMigrationException: The new state serializer (org.apache.flink.api.common.typeutils.base.MapSerializer@a07c9263) must not be incompatible with the old state serializer (org.apache.flink.api.common.typeutils.base.MapSerializer@706b3103).
>
>
>
> Does someone know what I’m doing wrong in my setup?
>
>
>
> Regards,
>
> Alexis.
>
>