You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Joshua Fan <jo...@gmail.com> on 2021/12/28 09:02:46 UTC

Mapstate got wrong UK when restored.

Hi All
My flink version is 1.11, the statebackend is rocksdb, and I want to write
a flink job to implement an adaptive window. I wrote a flink dag like below:

> DataStream<DataEntity> entities = env.addSource(new EntitySource()).setParallelism(1);
>
> entities.keyBy(DataEntity::getName).process(new EntityKeyedProcessFunction()).setParallelism(p);
>
> The important code is the EntityKeyedProcessFunction,  it is attached. I
have a mapstate in it like 'private transient MapState<String, SubState>
entityStates;'
I print the content of the mapstate when checkpoint completed, the content
is ok like below:

> 2021-12-28 16:22:05,487 INFO window.EntityKeyedProcessFunction [] -
> >>>>>>>>>the key is 1640679600000
> 2021-12-28 16:22:05,487 INFO window.EntityKeyedProcessFunction [] - the
> value is name = hippopotamus, value = [window.DataEntity@b9266a0, window.
> DataEntity@682fce90]
> 2021-12-28 16:22:05,488 INFO window.EntityKeyedProcessFunction [] - the
> value is name = crocodile, value = [window.DataEntity@16d20045]
> 2021-12-28 16:22:05,488 INFO window.EntityKeyedProcessFunction [] - the
> value is name = dolphin, value = [window.DataEntity@1820c75a, window.
> DataEntity@64f3b9f6]
> 2021-12-28 16:22:05,488 INFO window.EntityKeyedProcessFunction [] - the
> value is name = Dragonfly, value = [window.DataEntity@2b2ad03]
> 2021-12-28 16:22:05,488 INFO window.EntityKeyedProcessFunction [] - the
> value is name = Hedgehog, value = [window.DataEntity@65f39671, window.
> DataEntity@2df6b2bf]
> 2021-12-28 16:22:05,488 INFO window.EntityKeyedProcessFunction [] - the
> value is name = Bee, value = [window.DataEntity@13249998]
> 2021-12-28 16:22:05,488 INFO window.EntityKeyedProcessFunction [] - the
> value is name = cicada, value = [window.DataEntity@7266e125, window.
> DataEntity@167cf1ae]
> 2021-12-28 16:22:05,488 INFO window.EntityKeyedProcessFunction [] - the
> value is name = Mosquito, value = [window.DataEntity@2596aa5a, window.
> DataEntity@603c0804]
> 2021-12-28 16:22:05,488 INFO window.EntityKeyedProcessFunction [] - the
> value is name = Crane, value = [window.DataEntity@2a3192e9, window.
> DataEntity@3a65398f]
>
The key of the mapstate is a time which was coalesced to minute.

But when the job restarted from a checkpoint, the content of the mapstate
changed, actually, the key of the mapstate changed. It would show as below.

> 2021-12-28 16:15:45,379 INFO window.EntityKeyedProcessFunction [] -
> >>>>>>>>>the key is carp
> 2021-12-28 16:15:45,391 INFO window.EntityKeyedProcessFunction [] - the
> value is name = hippopotamus, value = [window.DataEntity@510d4c4b, window.
> DataEntity@7857e387]
> 2021-12-28 16:15:45,391 INFO window.EntityKeyedProcessFunction [] - the
> value is name = crocodile, value = [window.DataEntity@31366a33, window.
> DataEntity@a62074a]
> 2021-12-28 16:15:45,391 INFO window.EntityKeyedProcessFunction [] - the
> value is name = Dragonfly, value = [window.DataEntity@56db63fa, window.
> DataEntity@54befce0, window.DataEntity@4e7cf96a]
> 2021-12-28 16:15:45,391 INFO window.EntityKeyedProcessFunction [] - the
> value is name = Hedgehog, value = [window.DataEntity@7ad09313, window.
> DataEntity@592a2955]
> 2021-12-28 16:15:45,391 INFO window.EntityKeyedProcessFunction [] - the
> value is name = Mosquito, value = [window.DataEntity@48c05cae]
> 2021-12-28 16:15:45,392 INFO window.EntityKeyedProcessFunction [] - the
> value is name = duck, value = [window.DataEntity@3e9ef1a4]
> 2021-12-28 16:15:45,392 INFO window.EntityKeyedProcessFunction [] - the
> value is name = Rhinoceros, value = [window.DataEntity@25f11701, window.
> DataEntity@2334b667]
> 2021-12-28 16:15:45,392 INFO window.EntityKeyedProcessFunction [] - the
> value is name = Eagle, value = [window.DataEntity@7574eb4a]
>
It seems like the key of the restored mapstate is the key of the operator.
My minute time was gone, and it is replaced by the key of the operator.
It is so weird. Do I misuse the mapstate?
Thanks.

Yours
Josh

Re: Mapstate got wrong UK when restored.

Posted by Joshua Fan <jo...@gmail.com>.
HI David

Thanks a lot.
I almost get the point. When I use initializeState to restore the mapstate,
the task can not get a key at that moment, so I just get the key but not
the UK, when I use the mapstate in processElement, a key will be provided
implictly, so I would get the right UK and UV. But still I think I should
get <key,<UK,UV>> at the initializeState but not the <key, UV>. Any way, I
have changed my code to just use the mapstate provided by flink.
Thanks.

Yours sincerely
Josh


David Morávek <dm...@apache.org> 于2021年12月29日周三 23:01写道:

> The problem is that you're not actually using the underlying state during
> runtime, but instead you're simply using a java map abstraction. This
> property ("Map<Long, SubState> state") is simply bound to the UDF lifecycle
> and doesn't share the semantics of the keyed state.
>
> You should be using the "MapState" property directly to get the guarantees
> you're looking for. Then you also won't need to override the snapshot /
> initialize state methods, which simplifies the code a lot.
>
> D.
>
> On Wed, Dec 29, 2021 at 2:08 PM Joshua Fan <jo...@gmail.com> wrote:
>
>> Hi David,
>> Thanks for you reply.
>> Yes, for keyed state, every state is referenced by a particular key, but
>> I would guess it is a flink sdk issue, I mean,  the keyed state maybe saved
>> as (key,  keyed state), as for my situation, it is (key, mapstate(UK,UV)),
>> I think the key of this pair is not easy to get by user, when I do
>> mapstate.keyset I want to get the UK set, not the key set. According to my
>> job, the (key, mapstate(UK,UV)) can be get successfully when job is
>> running, but when job restarts from a checkpoint, the restored mapstate,
>> the pair seemed be changed to (key, UV), the UK just gone, I can not find
>> back the UK. I think the key of (key, mapstate(UK,UV)) will be implictly
>> added when write or read from the state by flink.
>> So, I am still not clear why I get the key but not the UK.
>>
>> Yours
>> Josh
>>
>> David Morávek <dm...@apache.org> 于2021年12月29日周三 17:32写道:
>>
>>> Hi Josh,
>>>
>>> it's important bit to understand is that the MapState (or any other
>>> keyed state) is scoped per *key* [1]. You can think about it in a way,
>>> that for each key you have a separate "map" that backs it. This is the
>>> important concept behind distributed stream processing, that allows you to
>>> parallelize the computation and still make sure, that all data for the same
>>> key end up in the same partition.
>>>
>>> Does this answer your question?
>>>
>>> [1]
>>> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/concepts/stateful-stream-processing/#keyed-state
>>>
>>> Best,
>>> D.
>>>
>>

Re: Mapstate got wrong UK when restored.

Posted by David Morávek <dm...@apache.org>.
The problem is that you're not actually using the underlying state during
runtime, but instead you're simply using a java map abstraction. This
property ("Map<Long, SubState> state") is simply bound to the UDF lifecycle
and doesn't share the semantics of the keyed state.

You should be using the "MapState" property directly to get the guarantees
you're looking for. Then you also won't need to override the snapshot /
initialize state methods, which simplifies the code a lot.

D.

On Wed, Dec 29, 2021 at 2:08 PM Joshua Fan <jo...@gmail.com> wrote:

> Hi David,
> Thanks for you reply.
> Yes, for keyed state, every state is referenced by a particular key, but I
> would guess it is a flink sdk issue, I mean,  the keyed state maybe saved
> as (key,  keyed state), as for my situation, it is (key, mapstate(UK,UV)),
> I think the key of this pair is not easy to get by user, when I do
> mapstate.keyset I want to get the UK set, not the key set. According to my
> job, the (key, mapstate(UK,UV)) can be get successfully when job is
> running, but when job restarts from a checkpoint, the restored mapstate,
> the pair seemed be changed to (key, UV), the UK just gone, I can not find
> back the UK. I think the key of (key, mapstate(UK,UV)) will be implictly
> added when write or read from the state by flink.
> So, I am still not clear why I get the key but not the UK.
>
> Yours
> Josh
>
> David Morávek <dm...@apache.org> 于2021年12月29日周三 17:32写道:
>
>> Hi Josh,
>>
>> it's important bit to understand is that the MapState (or any other keyed
>> state) is scoped per *key* [1]. You can think about it in a way, that
>> for each key you have a separate "map" that backs it. This is the important
>> concept behind distributed stream processing, that allows you to
>> parallelize the computation and still make sure, that all data for the same
>> key end up in the same partition.
>>
>> Does this answer your question?
>>
>> [1]
>> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/concepts/stateful-stream-processing/#keyed-state
>>
>> Best,
>> D.
>>
>

Re: Mapstate got wrong UK when restored.

Posted by Joshua Fan <jo...@gmail.com>.
Hi David,
Thanks for you reply.
Yes, for keyed state, every state is referenced by a particular key, but I
would guess it is a flink sdk issue, I mean,  the keyed state maybe saved
as (key,  keyed state), as for my situation, it is (key, mapstate(UK,UV)),
I think the key of this pair is not easy to get by user, when I do
mapstate.keyset I want to get the UK set, not the key set. According to my
job, the (key, mapstate(UK,UV)) can be get successfully when job is
running, but when job restarts from a checkpoint, the restored mapstate,
the pair seemed be changed to (key, UV), the UK just gone, I can not find
back the UK. I think the key of (key, mapstate(UK,UV)) will be implictly
added when write or read from the state by flink.
So, I am still not clear why I get the key but not the UK.

Yours
Josh

David Morávek <dm...@apache.org> 于2021年12月29日周三 17:32写道:

> Hi Josh,
>
> it's important bit to understand is that the MapState (or any other keyed
> state) is scoped per *key* [1]. You can think about it in a way, that for
> each key you have a separate "map" that backs it. This is the important
> concept behind distributed stream processing, that allows you to
> parallelize the computation and still make sure, that all data for the same
> key end up in the same partition.
>
> Does this answer your question?
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/concepts/stateful-stream-processing/#keyed-state
>
> Best,
> D.
>

Re: Mapstate got wrong UK when restored.

Posted by David Morávek <dm...@apache.org>.
Hi Josh,

it's important bit to understand is that the MapState (or any other keyed
state) is scoped per *key* [1]. You can think about it in a way, that for
each key you have a separate "map" that backs it. This is the important
concept behind distributed stream processing, that allows you to
parallelize the computation and still make sure, that all data for the same
key end up in the same partition.

Does this answer your question?

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/concepts/stateful-stream-processing/#keyed-state

Best,
D.