You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ro...@apache.org on 2022/07/26 15:43:01 UTC

[flink] branch master updated: [FLINK-26853][state] Update state serializer in StateMap when metaInfo changed

This is an automated email from the ASF dual-hosted git repository.

roman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 8df50536ef9 [FLINK-26853][state] Update state serializer in StateMap when metaInfo changed
8df50536ef9 is described below

commit 8df50536ef913b63620d896423c39cdd01941c55
Author: Hangxiang Yu <ma...@gmail.com>
AuthorDate: Thu Jul 14 11:34:18 2022 +0800

    [FLINK-26853][state] Update state serializer in StateMap when metaInfo changed
---
 .../runtime/state/heap/CopyOnWriteStateMap.java    |  6 +++-
 .../runtime/state/heap/CopyOnWriteStateTable.java  |  9 ++++++
 .../state/heap/CopyOnWriteStateTableTest.java      | 36 ++++++++++++++++++++++
 3 files changed, 50 insertions(+), 1 deletion(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateMap.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateMap.java
index e33a3cf0498..268605dc225 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateMap.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateMap.java
@@ -134,7 +134,7 @@ public class CopyOnWriteStateMap<K, N, S> extends StateMap<K, N, S> {
     private static final int MIN_TRANSFERRED_PER_INCREMENTAL_REHASH = 4;
 
     /** The serializer of the state. */
-    protected final TypeSerializer<S> stateSerializer;
+    protected TypeSerializer<S> stateSerializer;
 
     /**
      * An empty map shared by all zero-capacity maps (typically from default constructor). It is
@@ -794,6 +794,10 @@ public class CopyOnWriteStateMap<K, N, S> extends StateMap<K, N, S> {
         return stateSerializer;
     }
 
+    public void setStateSerializer(TypeSerializer<S> stateSerializer) {
+        this.stateSerializer = Preconditions.checkNotNull(stateSerializer);
+    }
+
     // StateMapEntry
     // -------------------------------------------------------------------------------------------------
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTable.java
index 3e23098094e..7227bf31fd6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTable.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTable.java
@@ -55,6 +55,15 @@ public class CopyOnWriteStateTable<K, N, S> extends StateTable<K, N, S> {
         return new CopyOnWriteStateMap<>(getStateSerializer());
     }
 
+    @Override
+    public void setMetaInfo(RegisteredKeyValueStateBackendMetaInfo<N, S> metaInfo) {
+        super.setMetaInfo(metaInfo);
+        for (StateMap<K, N, S> keyGroupedStateMap : keyGroupedStateMaps) {
+            ((CopyOnWriteStateMap<K, N, S>) keyGroupedStateMap)
+                    .setStateSerializer(metaInfo.getStateSerializer());
+        }
+    }
+
     // Snapshotting
     // ----------------------------------------------------------------------------------------------------
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTableTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTableTest.java
index 6f8266b81f0..54966ae9066 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTableTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTableTest.java
@@ -24,8 +24,11 @@ import org.apache.flink.api.common.typeutils.base.IntSerializer;
 import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo;
 import org.apache.flink.runtime.state.StateSnapshot;
+import org.apache.flink.runtime.testutils.statemigration.TestType;
+import org.apache.flink.util.Preconditions;
 
 import org.junit.Assert;
 import org.junit.Test;
@@ -36,6 +39,39 @@ import java.util.concurrent.ThreadLocalRandom;
 /** Test for {@link CopyOnWriteStateTable}. */
 public class CopyOnWriteStateTableTest {
 
+    /**
+     * This tests that Whether serializers are consistent between {@link StateTable} and {@link
+     * StateMap}.
+     */
+    @Test
+    public void testSerializerAfterMetaInfoChanged() {
+        RegisteredKeyValueStateBackendMetaInfo<Integer, TestType> originalMetaInfo =
+                new RegisteredKeyValueStateBackendMetaInfo<>(
+                        StateDescriptor.Type.VALUE,
+                        "test",
+                        IntSerializer.INSTANCE,
+                        new TestType.V1TestTypeSerializer());
+        InternalKeyContext<Integer> mockKeyContext =
+                new InternalKeyContextImpl<>(KeyGroupRange.of(0, 9), 10);
+        CopyOnWriteStateTable<Integer, Integer, TestType> table =
+                new CopyOnWriteStateTable<>(
+                        mockKeyContext, originalMetaInfo, IntSerializer.INSTANCE);
+
+        RegisteredKeyValueStateBackendMetaInfo<Integer, TestType> newMetaInfo =
+                new RegisteredKeyValueStateBackendMetaInfo<>(
+                        StateDescriptor.Type.VALUE,
+                        "test",
+                        IntSerializer.INSTANCE,
+                        new TestType.V2TestTypeSerializer());
+        table.setMetaInfo(newMetaInfo);
+        Preconditions.checkState(table.getState().length > 0);
+        for (StateMap<?, ?, ?> stateEntries : table.getState()) {
+            Assert.assertEquals(
+                    table.getStateSerializer(),
+                    ((CopyOnWriteStateMap<?, ?, ?>) stateEntries).getStateSerializer());
+        }
+    }
+
     /**
      * This tests that serializers used for snapshots are duplicates of the ones used in processing
      * to avoid race conditions in stateful serializers.