You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Yue Ma (Jira)" <ji...@apache.org> on 2022/04/13 11:01:00 UTC

[jira] [Updated] (FLINK-27218) Serializer in OperatorState has not been updated when new Serializers are NOT incompatible

     [ https://issues.apache.org/jira/browse/FLINK-27218?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Yue Ma updated FLINK-27218:
---------------------------
    Description: 
Now OperatorState such as can only be constructed *BroadcastState* or *PartitionableListState* via {*}DefaultOperatorStateBackend{*}. But when *BroadcastState* or *PartitionableListState* Serializer changes, it seems to have the following problems.

As an example, we can see how PartitionableListState is initialized.

First, we will construct a restored PartitionableListState based on the information in the snapshot during the restoreOperation.

Then we will update the StateMetaInfo in partitionableListState as the following code
{code:java}
TypeSerializerSchemaCompatibility<S> stateCompatibility =
                restoredPartitionableListStateMetaInfo.updatePartitionStateSerializer(newPartitionStateSerializer);

partitionableListState.setStateMetaInfo(restoredPartitionableListStateMetaInfo);{code}
The main problem is that there is also an *internalListCopySerializer* in *PartitionableListState* that is built using the previous Serializer and it has not been updated. 

But internalListCopySerializer will be used later.

Therefore, when we update the StateMetaInfo, the internalListCopySerializer also needs to be updated.

This problem also exists in BroadcastState

  was:
Now OperatorState can only be constructed via DefaultOperatorStateBackend. But when *BroadcastState* or *PartitionableListState* Serializer changes, it seems to have the following problems.

As an example, we can see how PartitionableListState is initialized.

First, we will construct a restored PartitionableListState based on the information in the snapshot during the restoreOperation.

Then we will update the StateMetaInfo in partitionableListState as the following code

 
{code:java}
TypeSerializerSchemaCompatibility<S> stateCompatibility =
                restoredPartitionableListStateMetaInfo.updatePartitionStateSerializer(newPartitionStateSerializer);

partitionableListState.setStateMetaInfo(restoredPartitionableListStateMetaInfo);{code}
 


The main problem is that there is also an internalListCopySerializer in PartitionableListState that is built using the previous Serializer and it has not been updated. 
But internalListCopySerializer will be used later when making the snopshot. Therefore, when we update the StateMetaInfo, the internalListCopySerializer also needs to be updated.

This problem also exists in BroadcastState


> Serializer in OperatorState has not been updated when new Serializers are NOT incompatible
> ------------------------------------------------------------------------------------------
>
>                 Key: FLINK-27218
>                 URL: https://issues.apache.org/jira/browse/FLINK-27218
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / State Backends
>    Affects Versions: 1.15.1
>            Reporter: Yue Ma
>            Priority: Major
>         Attachments: image-2022-04-13-14-50-10-921.png
>
>
> Now OperatorState such as can only be constructed *BroadcastState* or *PartitionableListState* via {*}DefaultOperatorStateBackend{*}. But when *BroadcastState* or *PartitionableListState* Serializer changes, it seems to have the following problems.
> As an example, we can see how PartitionableListState is initialized.
> First, we will construct a restored PartitionableListState based on the information in the snapshot during the restoreOperation.
> Then we will update the StateMetaInfo in partitionableListState as the following code
> {code:java}
> TypeSerializerSchemaCompatibility<S> stateCompatibility =
>                 restoredPartitionableListStateMetaInfo.updatePartitionStateSerializer(newPartitionStateSerializer);
> partitionableListState.setStateMetaInfo(restoredPartitionableListStateMetaInfo);{code}
> The main problem is that there is also an *internalListCopySerializer* in *PartitionableListState* that is built using the previous Serializer and it has not been updated. 
> But internalListCopySerializer will be used later.
> Therefore, when we update the StateMetaInfo, the internalListCopySerializer also needs to be updated.
> This problem also exists in BroadcastState



--
This message was sent by Atlassian Jira
(v8.20.1#820001)