You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/07/20 13:43:33 UTC

[GitHub] [flink] rkhachatryan commented on a diff in pull request #20268: [FLINK-26853][state] Update state serializer in StateMap when metaInfo changed

rkhachatryan commented on code in PR #20268:
URL: https://github.com/apache/flink/pull/20268#discussion_r925622394


##########
flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTableTest.java:
##########
@@ -26,16 +26,53 @@
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo;
 import org.apache.flink.runtime.state.StateSnapshot;
+import org.apache.flink.runtime.testutils.statemigration.TestType;
 
 import org.junit.Assert;
 import org.junit.Test;
 
 import java.io.IOException;
+import java.util.Arrays;
 import java.util.concurrent.ThreadLocalRandom;
 
 /** Test for {@link CopyOnWriteStateTable}. */
 public class CopyOnWriteStateTableTest {
 
+    /**
+     * This tests that Whether serializers are consistent between {@link StateTable} and {@link
+     * StateMap}.
+     */
+    @Test
+    public void testSerializerAfterMetaInfoChanged() {
+        RegisteredKeyValueStateBackendMetaInfo<Integer, TestType> originalMetaInfo =
+                new RegisteredKeyValueStateBackendMetaInfo<>(
+                        StateDescriptor.Type.VALUE,
+                        "test",
+                        IntSerializer.INSTANCE,
+                        new TestType.V1TestTypeSerializer());
+        InternalKeyContext<Integer> mockKeyContext = new MockInternalKeyContext<>();
+        CopyOnWriteStateTable<Integer, Integer, TestType> table =
+                new CopyOnWriteStateTable<>(
+                        mockKeyContext, originalMetaInfo, IntSerializer.INSTANCE);
+
+        RegisteredKeyValueStateBackendMetaInfo<Integer, TestType> newMetaInfo =
+                new RegisteredKeyValueStateBackendMetaInfo<>(
+                        StateDescriptor.Type.VALUE,
+                        "test",
+                        IntSerializer.INSTANCE,
+                        new TestType.V2TestTypeSerializer());
+        table.setMetaInfo(newMetaInfo);
+        long count =
+                Arrays.stream(table.getState())
+                        .filter(
+                                stateEntries ->
+                                        ((CopyOnWriteStateMap<?, ?, ?>) stateEntries)
+                                                .getStateSerializer()
+                                                .equals(table.getStateSerializer()))
+                        .count();
+        Assert.assertEquals(table.getState().length, count);

Review Comment:
   1. How about adding a check: `checkState(table.getState().length > 0);` ? That would fail the test if its configuration (key group range for example) is invalid
   2. How about moving the assertion inside the loop:
   ```
           for (StateMap<?, ?, ?> stateEntries : table.getState()) {
               assertEquals(
                       table.getStateSerializer(),
                       ((CopyOnWriteStateMap<?, ?, ?>) stateEntries).getStateSerializer());
           }
   
   ```
   That would provide an informative error message in case of assertion failure.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTableTest.java:
##########
@@ -26,16 +26,53 @@
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo;
 import org.apache.flink.runtime.state.StateSnapshot;
+import org.apache.flink.runtime.testutils.statemigration.TestType;
 
 import org.junit.Assert;
 import org.junit.Test;
 
 import java.io.IOException;
+import java.util.Arrays;
 import java.util.concurrent.ThreadLocalRandom;
 
 /** Test for {@link CopyOnWriteStateTable}. */
 public class CopyOnWriteStateTableTest {
 
+    /**
+     * This tests that Whether serializers are consistent between {@link StateTable} and {@link
+     * StateMap}.
+     */
+    @Test
+    public void testSerializerAfterMetaInfoChanged() {
+        RegisteredKeyValueStateBackendMetaInfo<Integer, TestType> originalMetaInfo =
+                new RegisteredKeyValueStateBackendMetaInfo<>(
+                        StateDescriptor.Type.VALUE,
+                        "test",
+                        IntSerializer.INSTANCE,
+                        new TestType.V1TestTypeSerializer());
+        InternalKeyContext<Integer> mockKeyContext = new MockInternalKeyContext<>();

Review Comment:
   I think there's no need for mocking here, we can use `InternalKeyContextImpl`:
   ```
   InternalKeyContext<Integer> keyContext =
           new InternalKeyContextImpl<>(KeyGroupRange.of(0, 9), 10);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org