You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/07/11 20:01:33 UTC

[GitHub] [flink] rkhachatryan commented on a diff in pull request #19679: [FLINK-23143][state/changelog] Support state migration for ChangelogSā€¦

rkhachatryan commented on code in PR #19679:
URL: https://github.com/apache/flink/pull/19679#discussion_r918295754


##########
flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java:
##########
@@ -163,6 +170,32 @@ KeyGroupedInternalPriorityQueue<T> create(
         return priorityQueuesManager.createOrUpdate(stateName, byteOrderedElementSerializer);
     }
 
+    @Override
+    public <N, SV, SEV, S extends State, IS extends S> IS upgradeKeyedState(
+            TypeSerializer<N> namespaceSerializer,
+            StateDescriptor<S, SV> stateDescriptor,
+            @Nonnull StateSnapshotTransformFactory<SEV> snapshotTransformFactory)
+            throws Exception {
+        Preconditions.checkState(createdKVStates.containsKey(stateDescriptor.getName()));
+        registeredKVStates.computeIfPresent(
+                stateDescriptor.getName(),
+                (stateName, stateTable) -> {
+                    stateTable.setMetaInfo(
+                            new RegisteredKeyValueStateBackendMetaInfo<>(
+                                    stateTable.getMetaInfo().snapshot()));
+                    return stateTable;

Review Comment:
   1. I think we also need to update `StateMap`s serializers, don't we?
   2. The purpose of re-creating `RegisteredKeyValueStateBackendMetaInfo` isn't obvious (it is using a different `StateSerializerProvider`, right?). I think it deserves a comment, WDYT?



##########
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java:
##########
@@ -565,18 +568,44 @@ public <N, SV, SEV, S extends State, IS extends S> IS createInternalState(
                     StateSnapshotTransformer.StateSnapshotTransformFactory<SEV>
                             snapshotTransformFactory)
             throws Exception {
+        ChangelogState changelogState =
+                changelogStateFactory.getExistingState(
+                        stateDesc.getName(), BackendStateType.KEY_VALUE);
+        if (changelogState == null) {
+            InternalKvState<K, N, SV> state =
+                    keyedStateBackend.createInternalState(
+                            namespaceSerializer, stateDesc, snapshotTransformFactory);
+
+            changelogState =
+                    changelogStateFactory.create(
+                            stateDesc,
+                            state,
+                            getKvStateChangeLogger(state, stateDesc, snapshotTransformFactory),
+                            keyedStateBackend /* pass the nested backend as key context so that it get key updates on recovery*/);
+        } else {
+            InternalKvState<K, N, SV> state =
+                    keyedStateBackend.upgradeKeyedState(
+                            namespaceSerializer, stateDesc, snapshotTransformFactory);
+            changelogState.setDelegatedState(state);

Review Comment:
   Is it only the delegated state that needs to be upgraded?
   I think at least the serializer inside the logger needs to be upgraded as well.
   
   ditto: PQ state



##########
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java:
##########
@@ -565,18 +568,44 @@ public <N, SV, SEV, S extends State, IS extends S> IS createInternalState(
                     StateSnapshotTransformer.StateSnapshotTransformFactory<SEV>
                             snapshotTransformFactory)
             throws Exception {
+        ChangelogState changelogState =
+                changelogStateFactory.getExistingState(
+                        stateDesc.getName(), BackendStateType.KEY_VALUE);
+        if (changelogState == null) {

Review Comment:
   I'm wondering about the "normal" case - i.e. with both materialized and non-materialized state:
   1. The internal backend might receive serializers from its initial snapshot
   2. when first reading the `METADATA` record from changelog, `changelogState` will be null` 
   3. so `keyedStateBackend.upgradeKeyedState` will not be called
   
   Or am I missing something?
   
   ditto: PQ state



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org