You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ro...@apache.org on 2022/07/26 15:43:01 UTC
[flink] branch master updated: [FLINK-26853][state] Update state serializer in StateMap when metaInfo changed
This is an automated email from the ASF dual-hosted git repository.
roman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 8df50536ef9 [FLINK-26853][state] Update state serializer in StateMap when metaInfo changed
8df50536ef9 is described below
commit 8df50536ef913b63620d896423c39cdd01941c55
Author: Hangxiang Yu <ma...@gmail.com>
AuthorDate: Thu Jul 14 11:34:18 2022 +0800
[FLINK-26853][state] Update state serializer in StateMap when metaInfo changed
---
.../runtime/state/heap/CopyOnWriteStateMap.java | 6 +++-
.../runtime/state/heap/CopyOnWriteStateTable.java | 9 ++++++
.../state/heap/CopyOnWriteStateTableTest.java | 36 ++++++++++++++++++++++
3 files changed, 50 insertions(+), 1 deletion(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateMap.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateMap.java
index e33a3cf0498..268605dc225 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateMap.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateMap.java
@@ -134,7 +134,7 @@ public class CopyOnWriteStateMap<K, N, S> extends StateMap<K, N, S> {
private static final int MIN_TRANSFERRED_PER_INCREMENTAL_REHASH = 4;
/** The serializer of the state. */
- protected final TypeSerializer<S> stateSerializer;
+ protected TypeSerializer<S> stateSerializer;
/**
* An empty map shared by all zero-capacity maps (typically from default constructor). It is
@@ -794,6 +794,10 @@ public class CopyOnWriteStateMap<K, N, S> extends StateMap<K, N, S> {
return stateSerializer;
}
+ public void setStateSerializer(TypeSerializer<S> stateSerializer) {
+ this.stateSerializer = Preconditions.checkNotNull(stateSerializer);
+ }
+
// StateMapEntry
// -------------------------------------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTable.java
index 3e23098094e..7227bf31fd6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTable.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTable.java
@@ -55,6 +55,15 @@ public class CopyOnWriteStateTable<K, N, S> extends StateTable<K, N, S> {
return new CopyOnWriteStateMap<>(getStateSerializer());
}
+ @Override
+ public void setMetaInfo(RegisteredKeyValueStateBackendMetaInfo<N, S> metaInfo) {
+ super.setMetaInfo(metaInfo);
+ for (StateMap<K, N, S> keyGroupedStateMap : keyGroupedStateMaps) {
+ ((CopyOnWriteStateMap<K, N, S>) keyGroupedStateMap)
+ .setStateSerializer(metaInfo.getStateSerializer());
+ }
+ }
+
// Snapshotting
// ----------------------------------------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTableTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTableTest.java
index 6f8266b81f0..54966ae9066 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTableTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTableTest.java
@@ -24,8 +24,11 @@ import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo;
import org.apache.flink.runtime.state.StateSnapshot;
+import org.apache.flink.runtime.testutils.statemigration.TestType;
+import org.apache.flink.util.Preconditions;
import org.junit.Assert;
import org.junit.Test;
@@ -36,6 +39,39 @@ import java.util.concurrent.ThreadLocalRandom;
/** Test for {@link CopyOnWriteStateTable}. */
public class CopyOnWriteStateTableTest {
+ /**
+ * This tests that Whether serializers are consistent between {@link StateTable} and {@link
+ * StateMap}.
+ */
+ @Test
+ public void testSerializerAfterMetaInfoChanged() {
+ RegisteredKeyValueStateBackendMetaInfo<Integer, TestType> originalMetaInfo =
+ new RegisteredKeyValueStateBackendMetaInfo<>(
+ StateDescriptor.Type.VALUE,
+ "test",
+ IntSerializer.INSTANCE,
+ new TestType.V1TestTypeSerializer());
+ InternalKeyContext<Integer> mockKeyContext =
+ new InternalKeyContextImpl<>(KeyGroupRange.of(0, 9), 10);
+ CopyOnWriteStateTable<Integer, Integer, TestType> table =
+ new CopyOnWriteStateTable<>(
+ mockKeyContext, originalMetaInfo, IntSerializer.INSTANCE);
+
+ RegisteredKeyValueStateBackendMetaInfo<Integer, TestType> newMetaInfo =
+ new RegisteredKeyValueStateBackendMetaInfo<>(
+ StateDescriptor.Type.VALUE,
+ "test",
+ IntSerializer.INSTANCE,
+ new TestType.V2TestTypeSerializer());
+ table.setMetaInfo(newMetaInfo);
+ Preconditions.checkState(table.getState().length > 0);
+ for (StateMap<?, ?, ?> stateEntries : table.getState()) {
+ Assert.assertEquals(
+ table.getStateSerializer(),
+ ((CopyOnWriteStateMap<?, ?, ?>) stateEntries).getStateSerializer());
+ }
+ }
+
/**
* This tests that serializers used for snapshots are duplicates of the ones used in processing
* to avoid race conditions in stateful serializers.