You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2019/07/23 02:10:54 UTC

[flink] 02/03: [FLINK-13322][table-runtime-blink] Fix serializer snapshot recovery in BaseArray and BaseMap serializers

This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 56e45c8397bdaf3a8dec45164021c54db5b47e7e
Author: TsReaper <ts...@gmail.com>
AuthorDate: Thu Jul 18 18:47:42 2019 +0800

    [FLINK-13322][table-runtime-blink] Fix serializer snapshot recovery in BaseArray and BaseMap serializers
---
 .../flink/table/types/InternalSerializers.java     |   4 +-
 .../flink/table/typeutils/BaseArraySerializer.java |  26 +++-
 .../flink/table/typeutils/BaseMapSerializer.java   |  59 +++++++--
 .../apache/flink/table/dataformat/BaseRowTest.java |   3 +-
 .../flink/table/dataformat/BinaryArrayTest.java    |   4 +-
 .../table/typeutils/BaseArraySerializerTest.java   |  57 ++++++++
 .../table/typeutils/BaseMapSerializerTest.java     |  66 +++++++++-
 .../flink/table/typeutils/SerializerTestUtil.java  | 143 +++++++++++++++++++++
 8 files changed, 337 insertions(+), 25 deletions(-)

diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/types/InternalSerializers.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/types/InternalSerializers.java
index e6eaf81..c8d5b76 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/types/InternalSerializers.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/types/InternalSerializers.java
@@ -80,9 +80,9 @@ public class InternalSerializers {
 				return new BaseArraySerializer(((ArrayType) type).getElementType(), config);
 			case MAP:
 				MapType mapType = (MapType) type;
-				return new BaseMapSerializer(mapType.getKeyType(), mapType.getValueType());
+				return new BaseMapSerializer(mapType.getKeyType(), mapType.getValueType(), config);
 			case MULTISET:
-				return new BaseMapSerializer(((MultisetType) type).getElementType(), new IntType());
+				return new BaseMapSerializer(((MultisetType) type).getElementType(), new IntType(), config);
 			case ROW:
 				RowType rowType = (RowType) type;
 				return new BaseRowSerializer(config, rowType);
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/typeutils/BaseArraySerializer.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/typeutils/BaseArraySerializer.java
index d8bf95e..c749dfa 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/typeutils/BaseArraySerializer.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/typeutils/BaseArraySerializer.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.table.typeutils;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
@@ -216,9 +217,14 @@ public class BaseArraySerializer extends TypeSerializer<BaseArray> {
 		return eleType.hashCode();
 	}
 
+	@VisibleForTesting
+	public TypeSerializer getEleSer() {
+		return eleSer;
+	}
+
 	@Override
 	public TypeSerializerSnapshot<BaseArray> snapshotConfiguration() {
-		return new BaseArraySerializerSnapshot(eleType);
+		return new BaseArraySerializerSnapshot(eleType, eleSer);
 	}
 
 	/**
@@ -228,14 +234,16 @@ public class BaseArraySerializer extends TypeSerializer<BaseArray> {
 		private static final int CURRENT_VERSION = 3;
 
 		private LogicalType previousType;
+		private TypeSerializer previousEleSer;
 
 		@SuppressWarnings("unused")
 		public BaseArraySerializerSnapshot() {
 			// this constructor is used when restoring from a checkpoint/savepoint.
 		}
 
-		BaseArraySerializerSnapshot(LogicalType eleType) {
+		BaseArraySerializerSnapshot(LogicalType eleType, TypeSerializer eleSer) {
 			this.previousType = eleType;
+			this.previousEleSer = eleSer;
 		}
 
 		@Override
@@ -245,14 +253,17 @@ public class BaseArraySerializer extends TypeSerializer<BaseArray> {
 
 		@Override
 		public void writeSnapshot(DataOutputView out) throws IOException {
-			InstantiationUtil.serializeObject(new DataOutputViewStream(out), previousType);
+			DataOutputViewStream outStream = new DataOutputViewStream(out);
+			InstantiationUtil.serializeObject(outStream, previousType);
+			InstantiationUtil.serializeObject(outStream, previousEleSer);
 		}
 
 		@Override
 		public void readSnapshot(int readVersion, DataInputView in, ClassLoader userCodeClassLoader) throws IOException {
 			try {
-				this.previousType = InstantiationUtil.deserializeObject(
-						new DataInputViewStream(in), userCodeClassLoader);
+				DataInputViewStream inStream = new DataInputViewStream(in);
+				this.previousType = InstantiationUtil.deserializeObject(inStream, userCodeClassLoader);
+				this.previousEleSer = InstantiationUtil.deserializeObject(inStream, userCodeClassLoader);
 			} catch (ClassNotFoundException e) {
 				throw new IOException(e);
 			}
@@ -260,7 +271,7 @@ public class BaseArraySerializer extends TypeSerializer<BaseArray> {
 
 		@Override
 		public TypeSerializer<BaseArray> restoreSerializer() {
-			return new BaseArraySerializer(previousType, new ExecutionConfig());
+			return new BaseArraySerializer(previousType, previousEleSer);
 		}
 
 		@Override
@@ -270,7 +281,8 @@ public class BaseArraySerializer extends TypeSerializer<BaseArray> {
 			}
 
 			BaseArraySerializer newBaseArraySerializer = (BaseArraySerializer) newSerializer;
-			if (!previousType.equals(newBaseArraySerializer.eleType)) {
+			if (!previousType.equals(newBaseArraySerializer.eleType) ||
+				!previousEleSer.equals(newBaseArraySerializer.eleSer)) {
 				return TypeSerializerSchemaCompatibility.incompatible();
 			} else {
 				return TypeSerializerSchemaCompatibility.compatibleAsIs();
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/typeutils/BaseMapSerializer.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/typeutils/BaseMapSerializer.java
index 01dba56..766c3b5 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/typeutils/BaseMapSerializer.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/typeutils/BaseMapSerializer.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.table.typeutils;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
@@ -62,12 +63,21 @@ public class BaseMapSerializer extends TypeSerializer<BaseMap> {
 	private transient BinaryArrayWriter reuseKeyWriter;
 	private transient BinaryArrayWriter reuseValueWriter;
 
-	public BaseMapSerializer(LogicalType keyType, LogicalType valueType) {
+	public BaseMapSerializer(LogicalType keyType, LogicalType valueType, ExecutionConfig conf) {
 		this.keyType = keyType;
 		this.valueType = valueType;
 
-		this.keySerializer = InternalSerializers.create(keyType, new ExecutionConfig());
-		this.valueSerializer = InternalSerializers.create(valueType, new ExecutionConfig());
+		this.keySerializer = InternalSerializers.create(keyType, conf);
+		this.valueSerializer = InternalSerializers.create(valueType, conf);
+	}
+
+	private BaseMapSerializer(
+			LogicalType keyType, LogicalType valueType, TypeSerializer keySerializer, TypeSerializer valueSerializer) {
+		this.keyType = keyType;
+		this.valueType = valueType;
+
+		this.keySerializer = keySerializer;
+		this.valueSerializer = valueSerializer;
 	}
 
 	@Override
@@ -77,7 +87,7 @@ public class BaseMapSerializer extends TypeSerializer<BaseMap> {
 
 	@Override
 	public TypeSerializer<BaseMap> duplicate() {
-		return new BaseMapSerializer(keyType, valueType);
+		return new BaseMapSerializer(keyType, valueType, keySerializer.duplicate(), valueSerializer.duplicate());
 	}
 
 	@Override
@@ -217,9 +227,19 @@ public class BaseMapSerializer extends TypeSerializer<BaseMap> {
 		return result;
 	}
 
+	@VisibleForTesting
+	public TypeSerializer getKeySerializer() {
+		return keySerializer;
+	}
+
+	@VisibleForTesting
+	public TypeSerializer getValueSerializer() {
+		return valueSerializer;
+	}
+
 	@Override
 	public TypeSerializerSnapshot<BaseMap> snapshotConfiguration() {
-		return new BaseMapSerializerSnapshot(keyType, valueType);
+		return new BaseMapSerializerSnapshot(keyType, valueType, keySerializer, valueSerializer);
 	}
 
 	/**
@@ -231,14 +251,20 @@ public class BaseMapSerializer extends TypeSerializer<BaseMap> {
 		private LogicalType previousKeyType;
 		private LogicalType previousValueType;
 
+		private TypeSerializer previousKeySerializer;
+		private TypeSerializer previousValueSerializer;
+
 		@SuppressWarnings("unused")
 		public BaseMapSerializerSnapshot() {
 			// this constructor is used when restoring from a checkpoint/savepoint.
 		}
 
-		BaseMapSerializerSnapshot(LogicalType keyT, LogicalType valueT) {
+		BaseMapSerializerSnapshot(LogicalType keyT, LogicalType valueT, TypeSerializer keySer, TypeSerializer valueSer) {
 			this.previousKeyType = keyT;
 			this.previousValueType = valueT;
+
+			this.previousKeySerializer = keySer;
+			this.previousValueSerializer = valueSer;
 		}
 
 		@Override
@@ -248,15 +274,21 @@ public class BaseMapSerializer extends TypeSerializer<BaseMap> {
 
 		@Override
 		public void writeSnapshot(DataOutputView out) throws IOException {
-			InstantiationUtil.serializeObject(new DataOutputViewStream(out), previousKeyType);
-			InstantiationUtil.serializeObject(new DataOutputViewStream(out), previousValueType);
+			DataOutputViewStream outStream = new DataOutputViewStream(out);
+			InstantiationUtil.serializeObject(outStream, previousKeyType);
+			InstantiationUtil.serializeObject(outStream, previousValueType);
+			InstantiationUtil.serializeObject(outStream, previousKeySerializer);
+			InstantiationUtil.serializeObject(outStream, previousValueSerializer);
 		}
 
 		@Override
 		public void readSnapshot(int readVersion, DataInputView in, ClassLoader userCodeClassLoader) throws IOException {
 			try {
-				this.previousKeyType = InstantiationUtil.deserializeObject(new DataInputViewStream(in), userCodeClassLoader);
-				this.previousValueType = InstantiationUtil.deserializeObject(new DataInputViewStream(in), userCodeClassLoader);
+				DataInputViewStream inStream = new DataInputViewStream(in);
+				this.previousKeyType = InstantiationUtil.deserializeObject(inStream, userCodeClassLoader);
+				this.previousValueType = InstantiationUtil.deserializeObject(inStream, userCodeClassLoader);
+				this.previousKeySerializer = InstantiationUtil.deserializeObject(inStream, userCodeClassLoader);
+				this.previousValueSerializer = InstantiationUtil.deserializeObject(inStream, userCodeClassLoader);
 			} catch (ClassNotFoundException e) {
 				throw new IOException(e);
 			}
@@ -264,7 +296,8 @@ public class BaseMapSerializer extends TypeSerializer<BaseMap> {
 
 		@Override
 		public TypeSerializer<BaseMap> restoreSerializer() {
-			return new BaseMapSerializer(previousKeyType, previousValueType);
+			return new BaseMapSerializer(
+				previousKeyType, previousValueType, previousKeySerializer, previousValueSerializer);
 		}
 
 		@Override
@@ -275,7 +308,9 @@ public class BaseMapSerializer extends TypeSerializer<BaseMap> {
 
 			BaseMapSerializer newBaseMapSerializer = (BaseMapSerializer) newSerializer;
 			if (!previousKeyType.equals(newBaseMapSerializer.keyType) ||
-					!previousValueType.equals(newBaseMapSerializer.valueType)) {
+				!previousValueType.equals(newBaseMapSerializer.valueType) ||
+				!previousKeySerializer.equals(newBaseMapSerializer.keySerializer) ||
+				!previousValueSerializer.equals(newBaseMapSerializer.valueSerializer)) {
 				return TypeSerializerSchemaCompatibility.incompatible();
 			} else {
 				return TypeSerializerSchemaCompatibility.compatibleAsIs();
diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/BaseRowTest.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/BaseRowTest.java
index 43bc6a6..8723405 100644
--- a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/BaseRowTest.java
+++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/BaseRowTest.java
@@ -102,7 +102,8 @@ public class BaseRowTest {
 		writer.writeDecimal(10, decimal1, 5);
 		writer.writeDecimal(11, decimal2, 20);
 		writer.writeArray(12, array, new BaseArraySerializer(DataTypes.INT().getLogicalType(), null));
-		writer.writeMap(13, map, new BaseMapSerializer(DataTypes.INT().getLogicalType(), DataTypes.INT().getLogicalType()));
+		writer.writeMap(13, map, new BaseMapSerializer(
+			DataTypes.INT().getLogicalType(), DataTypes.INT().getLogicalType(), null));
 		writer.writeRow(14, underRow, new BaseRowSerializer(null, RowType.of(new IntType(), new IntType())));
 		writer.writeBinary(15, bytes);
 		return row;
diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/BinaryArrayTest.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/BinaryArrayTest.java
index 54d0802..5063646 100644
--- a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/BinaryArrayTest.java
+++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/BinaryArrayTest.java
@@ -325,7 +325,7 @@ public class BinaryArrayTest {
 			BinaryArrayWriter writer = new BinaryArrayWriter(array, 2, 8);
 			writer.setNullAt(0);
 			writer.writeMap(1, BinaryMap.valueOf(subArray, subArray),
-					new BaseMapSerializer(DataTypes.INT().getLogicalType(), DataTypes.INT().getLogicalType()));
+					new BaseMapSerializer(DataTypes.INT().getLogicalType(), DataTypes.INT().getLogicalType(), null));
 			writer.complete();
 
 			assertTrue(array.isNullAt(0));
@@ -358,7 +358,7 @@ public class BinaryArrayTest {
 		BinaryRow row = new BinaryRow(1);
 		BinaryRowWriter rowWriter = new BinaryRowWriter(row);
 		rowWriter.writeMap(0, binaryMap,
-				new BaseMapSerializer(DataTypes.INT().getLogicalType(), DataTypes.INT().getLogicalType()));
+				new BaseMapSerializer(DataTypes.INT().getLogicalType(), DataTypes.INT().getLogicalType(), null));
 		rowWriter.complete();
 
 		BinaryMap map = (BinaryMap) row.getMap(0);
diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/typeutils/BaseArraySerializerTest.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/typeutils/BaseArraySerializerTest.java
index feb9893..f7f0e31 100644
--- a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/typeutils/BaseArraySerializerTest.java
+++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/typeutils/BaseArraySerializerTest.java
@@ -19,15 +19,31 @@
 package org.apache.flink.table.typeutils;
 
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.SerializerTestBase;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.dataformat.BaseArray;
 import org.apache.flink.table.dataformat.BinaryArray;
 import org.apache.flink.table.dataformat.BinaryArrayWriter;
+import org.apache.flink.table.dataformat.BinaryGeneric;
 import org.apache.flink.table.dataformat.BinaryString;
 import org.apache.flink.table.dataformat.GenericArray;
 import org.apache.flink.testutils.DeeplyEqualsChecker;
 
+import org.junit.Test;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+
+import static org.apache.flink.table.typeutils.SerializerTestUtil.MyObj;
+import static org.apache.flink.table.typeutils.SerializerTestUtil.MyObjSerializer;
+import static org.apache.flink.table.typeutils.SerializerTestUtil.snapshotAndReconfigure;
+import static org.junit.Assert.assertEquals;
+
 /**
  * A test for the {@link BaseArraySerializer}.
  */
@@ -58,6 +74,47 @@ public class BaseArraySerializerTest extends SerializerTestBase<BaseArray> {
 		));
 	}
 
+	@Test
+	public void testExecutionConfigWithKryo() throws Exception {
+		// serialize base array
+		ExecutionConfig config = new ExecutionConfig();
+		config.enableForceKryo();
+		config.registerTypeWithKryoSerializer(MyObj.class, new MyObjSerializer());
+		final BaseArraySerializer serializer = createSerializerWithConfig(config);
+
+		MyObj inputObj = new MyObj(114514, 1919810);
+		BaseArray inputArray = new GenericArray(new BinaryGeneric[] {
+			new BinaryGeneric<>(inputObj, new KryoSerializer<>(MyObj.class, config))
+		}, 1);
+
+		byte[] serialized;
+		try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
+			serializer.serialize(inputArray, new DataOutputViewStreamWrapper(out));
+			serialized = out.toByteArray();
+		}
+
+		// deserialize base array using restored serializer
+		final BaseArraySerializer restoreSerializer =
+			(BaseArraySerializer) snapshotAndReconfigure(serializer, () -> createSerializerWithConfig(config));
+
+		BaseArray outputArray;
+		try (ByteArrayInputStream in = new ByteArrayInputStream(serialized)) {
+			outputArray = restoreSerializer.deserialize(new DataInputViewStreamWrapper(in));
+		}
+
+		TypeSerializer restoreEleSer = restoreSerializer.getEleSer();
+		assertEquals(serializer.getEleSer(), restoreEleSer);
+
+		MyObj outputObj = BinaryGeneric.getJavaObjectFromBinaryGeneric(
+			outputArray.getGeneric(0), new KryoSerializer<>(MyObj.class, config));
+		assertEquals(inputObj, outputObj);
+	}
+
+	private BaseArraySerializer createSerializerWithConfig(ExecutionConfig config) {
+		return new BaseArraySerializer(
+			DataTypes.ANY(TypeInformation.of(MyObj.class)).getLogicalType(), config);
+	}
+
 	@Override
 	protected BaseArraySerializer createSerializer() {
 		return new BaseArraySerializer(DataTypes.STRING().getLogicalType(), new ExecutionConfig());
diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/typeutils/BaseMapSerializerTest.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/typeutils/BaseMapSerializerTest.java
index 38216dc..6c7710b 100644
--- a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/typeutils/BaseMapSerializerTest.java
+++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/typeutils/BaseMapSerializerTest.java
@@ -18,20 +18,36 @@
 
 package org.apache.flink.table.typeutils;
 
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.SerializerTestBase;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.dataformat.BaseMap;
 import org.apache.flink.table.dataformat.BinaryArray;
 import org.apache.flink.table.dataformat.BinaryArrayWriter;
+import org.apache.flink.table.dataformat.BinaryGeneric;
 import org.apache.flink.table.dataformat.BinaryMap;
 import org.apache.flink.table.dataformat.BinaryString;
 import org.apache.flink.table.dataformat.GenericMap;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.testutils.DeeplyEqualsChecker;
 
+import org.junit.Test;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
 import java.util.HashMap;
 import java.util.Map;
 
+import static org.apache.flink.table.typeutils.SerializerTestUtil.MyObj;
+import static org.apache.flink.table.typeutils.SerializerTestUtil.MyObjSerializer;
+import static org.apache.flink.table.typeutils.SerializerTestUtil.snapshotAndReconfigure;
+import static org.junit.Assert.assertEquals;
+
 /**
  * A test for the {@link BaseMapSerializer}.
  */
@@ -55,8 +71,56 @@ public class BaseMapSerializerTest extends SerializerTestBase<BaseMap> {
 		));
 	}
 
+	@Test
+	public void testExecutionConfigWithKryo() throws Exception {
+		// serialize base array
+		ExecutionConfig config = new ExecutionConfig();
+		config.enableForceKryo();
+		config.registerTypeWithKryoSerializer(MyObj.class, new MyObjSerializer());
+		final BaseMapSerializer serializer = createSerializerWithConfig(config);
+
+		int inputKey = 998244353;
+		MyObj inputObj = new MyObj(114514, 1919810);
+		Map<Object, Object> javaMap = new HashMap<>();
+		javaMap.put(inputKey, new BinaryGeneric<>(inputObj, new KryoSerializer<>(MyObj.class, config)));
+		BaseMap inputMap = new GenericMap(javaMap);
+
+		byte[] serialized;
+		try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
+			serializer.serialize(inputMap, new DataOutputViewStreamWrapper(out));
+			serialized = out.toByteArray();
+		}
+
+		// deserialize base array using restored serializer
+		final BaseMapSerializer restoreSerializer =
+			(BaseMapSerializer) snapshotAndReconfigure(serializer, () -> createSerializerWithConfig(config));
+
+		BaseMap outputMap;
+		try (ByteArrayInputStream in = new ByteArrayInputStream(serialized)) {
+			outputMap = restoreSerializer.deserialize(new DataInputViewStreamWrapper(in));
+		}
+
+		TypeSerializer restoreKeySer = restoreSerializer.getKeySerializer();
+		TypeSerializer restoreValueSer = restoreSerializer.getValueSerializer();
+		assertEquals(serializer.getKeySerializer(), restoreKeySer);
+		assertEquals(serializer.getValueSerializer(), restoreValueSer);
+
+		MyObj outputObj = BinaryGeneric.getJavaObjectFromBinaryGeneric(
+			(BinaryGeneric) outputMap.toJavaMap(
+				DataTypes.INT().getLogicalType(), DataTypes.ANY(TypeInformation.of(MyObj.class)).getLogicalType())
+				.get(inputKey), new KryoSerializer<>(MyObj.class, config));
+		assertEquals(inputObj, outputObj);
+	}
+
+	private BaseMapSerializer createSerializerWithConfig(ExecutionConfig config) {
+		return new BaseMapSerializer(
+			DataTypes.INT().getLogicalType(),
+			DataTypes.ANY(TypeInformation.of(MyObj.class)).getLogicalType(),
+			config);
+	}
+
 	private static BaseMapSerializer newSer() {
-		return new BaseMapSerializer(INT, STRING);
+		return new BaseMapSerializer(INT, STRING, new ExecutionConfig());
 	}
 
 	@Override
diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/typeutils/SerializerTestUtil.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/typeutils/SerializerTestUtil.java
new file mode 100644
index 0000000..0587e07
--- /dev/null
+++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/typeutils/SerializerTestUtil.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.typeutils;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.Serializable;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Utils for testing serializers.
+ */
+public class SerializerTestUtil {
+
+	/**
+	 * Snapshot and restore the given serializer. Returns the restored serializer.
+	 */
+	public static <T> TypeSerializer<T> snapshotAndReconfigure(
+		TypeSerializer<T> serializer, SerializerGetter<T> serializerGetter) throws IOException {
+		TypeSerializerSnapshot<T> configSnapshot = serializer.snapshotConfiguration();
+
+		byte[] serializedConfig;
+		try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
+			TypeSerializerSnapshotSerializationUtil.writeSerializerSnapshot(
+				new DataOutputViewStreamWrapper(out), configSnapshot, serializer);
+			serializedConfig = out.toByteArray();
+		}
+
+		TypeSerializerSnapshot<T> restoredConfig;
+		try (ByteArrayInputStream in = new ByteArrayInputStream(serializedConfig)) {
+			restoredConfig = TypeSerializerSnapshotSerializationUtil.readSerializerSnapshot(
+				new DataInputViewStreamWrapper(in),
+				Thread.currentThread().getContextClassLoader(),
+				serializerGetter.getSerializer());
+		}
+
+		TypeSerializerSchemaCompatibility<T> strategy =
+			restoredConfig.resolveSchemaCompatibility(serializerGetter.getSerializer());
+		final TypeSerializer<T> restoredSerializer;
+		if (strategy.isCompatibleAsIs()) {
+			restoredSerializer = restoredConfig.restoreSerializer();
+		}
+		else if (strategy.isCompatibleWithReconfiguredSerializer()) {
+			restoredSerializer = strategy.getReconfiguredSerializer();
+		}
+		else {
+			throw new AssertionError("Unable to restore serializer with " + strategy);
+		}
+		assertEquals(serializer.getClass(), restoredSerializer.getClass());
+
+		return restoredSerializer;
+	}
+
+	/**
+	 * Used for snapshotAndReconfigure method to provide serializers when restoring.
+	 */
+	public interface SerializerGetter<T> {
+		TypeSerializer<T> getSerializer();
+	}
+
+	/**
+	 * A simple POJO.
+	 */
+	public static class MyObj {
+		private int a;
+		private int b;
+
+		MyObj(int a, int b) {
+			this.a = a;
+			this.b = b;
+		}
+
+		int getA() {
+			return a;
+		}
+
+		int getB() {
+			return b;
+		}
+
+		@Override
+		public boolean equals(Object o) {
+			return o instanceof MyObj && ((MyObj) o).a == a && ((MyObj) o).b == b;
+		}
+	}
+
+	/**
+	 * Kryo serializer for POJO.
+	 */
+	public static class MyObjSerializer extends Serializer<MyObj> implements Serializable {
+
+		private static final long serialVersionUID = 1L;
+		private static final int delta = 7;
+
+		@Override
+		public void write(Kryo kryo, Output output, MyObj myObj) {
+			output.writeInt(myObj.getA() + delta);
+			output.writeInt(myObj.getB() + delta);
+		}
+
+		@Override
+		public MyObj read(Kryo kryo, Input input, Class<MyObj> aClass) {
+			int a = input.readInt() - delta;
+			int b = input.readInt() - delta;
+			return new MyObj(a, b);
+		}
+
+		@Override
+		public boolean equals(Object o) {
+			return o instanceof MyObjSerializer;
+		}
+	}
+}