You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Alexis Sarda-Espinosa <al...@microfocus.com> on 2022/04/08 07:58:42 UTC

Using state processor API to read state defined with a TypeHint

Hi everyone,

I have a ProcessWindowFunction that uses Global window state. It uses MapState with a descriptor defined like this:

MapStateDescriptor<Long, List<String>> msd = new MapStateDescriptor<>(
        "descriptorName",
        TypeInformation.of(Long.class),
        TypeInformation.of(new TypeHint<List<String>>() {})
);

Now I'm trying to access a checkpoint's state data to read that (created with RocksDB as backend in Flink 1.14.4). I have a WindowReaderFunction<MyPojo, Integer, String, TimeWindow> that defines the same descriptor and calls this in readWindow:

MapState<Long, List<String>> mapState = context.globalState().getMapState(msd);

After loading the savepoint with EmbeddedRocksDBStateBackend(true), I try to configure the reader function like this:

savepoint
        .window(SlidingEventTimeWindows.of(Time.minutes(11L), Time.minutes(1L)))
        .process(
                "my-uid",
                new StateReaderFunction(),
                Types.STRING,
                TypeInformation.of(MyPojo.class),
                Types.INT
        )
        .print();

But I am getting this exception:

Caused by: org.apache.flink.util.StateMigrationException: The new state serializer (org.apache.flink.api.common.typeutils.base.MapSerializer@a07c9263) must not be incompatible with the old state serializer (org.apache.flink.api.common.typeutils.base.MapSerializer@706b3103).

Does someone know what I'm doing wrong in my setup?

Regards,
Alexis.


RE: Using state processor API to read state defined with a TypeHint

Posted by Alexis Sarda-Espinosa <al...@microfocus.com>.
Hi Roman,

Thanks for the quick response. It wasn't that, but your comment about erasure made me realize I should have debugged the code and looked at the types. Apparently setting TTL changes the serializer, so I also had to add TTL in the WindowReaderFunction.

Regards,
Alexis.

-----Original Message-----
From: Roman Khachatryan <ro...@apache.org> 
Sent: Freitag, 8. April 2022 11:48
To: Alexis Sarda-Espinosa <al...@microfocus.com>
Cc: user@flink.apache.org
Subject: Re: Using state processor API to read state defined with a TypeHint

Hi Alexis,

I think your setup is fine, but probably Java type erasure makes Flink consider the two serializers as different.
Could you try creating a MapStateDescriptor by explicitly providing serializers (constructed manually)?

Regards,
Roman


On Fri, Apr 8, 2022 at 10:01 AM Alexis Sarda-Espinosa <al...@microfocus.com> wrote:
>
> Hi everyone,
>
>
>
> I have a ProcessWindowFunction that uses Global window state. It uses MapState with a descriptor defined like this:
>
>
>
> MapStateDescriptor<Long, List<String>> msd = new MapStateDescriptor<>(
>
>         "descriptorName",
>
>         TypeInformation.of(Long.class),
>
>         TypeInformation.of(new TypeHint<List<String>>() {})
>
> );
>
>
>
> Now I’m trying to access a checkpoint’s state data to read that (created with RocksDB as backend in Flink 1.14.4). I have a WindowReaderFunction<MyPojo, Integer, String, TimeWindow> that defines the same descriptor and calls this in readWindow:
>
>
>
> MapState<Long, List<String>> mapState = 
> context.globalState().getMapState(msd);
>
>
>
> After loading the savepoint with EmbeddedRocksDBStateBackend(true), I try to configure the reader function like this:
>
>
>
> savepoint
>
>         .window(SlidingEventTimeWindows.of(Time.minutes(11L), 
> Time.minutes(1L)))
>
>         .process(
>
>                 "my-uid",
>
>                 new StateReaderFunction(),
>
>                 Types.STRING,
>
>                 TypeInformation.of(MyPojo.class),
>
>                 Types.INT
>
>         )
>
>         .print();
>
>
>
> But I am getting this exception:
>
>
>
> Caused by: org.apache.flink.util.StateMigrationException: The new state serializer (org.apache.flink.api.common.typeutils.base.MapSerializer@a07c9263) must not be incompatible with the old state serializer (org.apache.flink.api.common.typeutils.base.MapSerializer@706b3103).
>
>
>
> Does someone know what I’m doing wrong in my setup?
>
>
>
> Regards,
>
> Alexis.
>
>

Re: Using state processor API to read state defined with a TypeHint

Posted by Roman Khachatryan <ro...@apache.org>.
Hi Alexis,

I think your setup is fine, but probably Java type erasure makes Flink
consider the two serializers as different.
Could you try creating a MapStateDescriptor by explicitly providing
serializers (constructed manually)?

Regards,
Roman


On Fri, Apr 8, 2022 at 10:01 AM Alexis Sarda-Espinosa
<al...@microfocus.com> wrote:
>
> Hi everyone,
>
>
>
> I have a ProcessWindowFunction that uses Global window state. It uses MapState with a descriptor defined like this:
>
>
>
> MapStateDescriptor<Long, List<String>> msd = new MapStateDescriptor<>(
>
>         "descriptorName",
>
>         TypeInformation.of(Long.class),
>
>         TypeInformation.of(new TypeHint<List<String>>() {})
>
> );
>
>
>
> Now I’m trying to access a checkpoint’s state data to read that (created with RocksDB as backend in Flink 1.14.4). I have a WindowReaderFunction<MyPojo, Integer, String, TimeWindow> that defines the same descriptor and calls this in readWindow:
>
>
>
> MapState<Long, List<String>> mapState = context.globalState().getMapState(msd);
>
>
>
> After loading the savepoint with EmbeddedRocksDBStateBackend(true), I try to configure the reader function like this:
>
>
>
> savepoint
>
>         .window(SlidingEventTimeWindows.of(Time.minutes(11L), Time.minutes(1L)))
>
>         .process(
>
>                 "my-uid",
>
>                 new StateReaderFunction(),
>
>                 Types.STRING,
>
>                 TypeInformation.of(MyPojo.class),
>
>                 Types.INT
>
>         )
>
>         .print();
>
>
>
> But I am getting this exception:
>
>
>
> Caused by: org.apache.flink.util.StateMigrationException: The new state serializer (org.apache.flink.api.common.typeutils.base.MapSerializer@a07c9263) must not be incompatible with the old state serializer (org.apache.flink.api.common.typeutils.base.MapSerializer@706b3103).
>
>
>
> Does someone know what I’m doing wrong in my setup?
>
>
>
> Regards,
>
> Alexis.
>
>