You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2019/07/23 02:10:54 UTC
[flink] 02/03: [FLINK-13322][table-runtime-blink] Fix serializer
snapshot recovery in BaseArray and BaseMap serializers
This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 56e45c8397bdaf3a8dec45164021c54db5b47e7e
Author: TsReaper <ts...@gmail.com>
AuthorDate: Thu Jul 18 18:47:42 2019 +0800
[FLINK-13322][table-runtime-blink] Fix serializer snapshot recovery in BaseArray and BaseMap serializers
---
.../flink/table/types/InternalSerializers.java | 4 +-
.../flink/table/typeutils/BaseArraySerializer.java | 26 +++-
.../flink/table/typeutils/BaseMapSerializer.java | 59 +++++++--
.../apache/flink/table/dataformat/BaseRowTest.java | 3 +-
.../flink/table/dataformat/BinaryArrayTest.java | 4 +-
.../table/typeutils/BaseArraySerializerTest.java | 57 ++++++++
.../table/typeutils/BaseMapSerializerTest.java | 66 +++++++++-
.../flink/table/typeutils/SerializerTestUtil.java | 143 +++++++++++++++++++++
8 files changed, 337 insertions(+), 25 deletions(-)
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/types/InternalSerializers.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/types/InternalSerializers.java
index e6eaf81..c8d5b76 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/types/InternalSerializers.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/types/InternalSerializers.java
@@ -80,9 +80,9 @@ public class InternalSerializers {
return new BaseArraySerializer(((ArrayType) type).getElementType(), config);
case MAP:
MapType mapType = (MapType) type;
- return new BaseMapSerializer(mapType.getKeyType(), mapType.getValueType());
+ return new BaseMapSerializer(mapType.getKeyType(), mapType.getValueType(), config);
case MULTISET:
- return new BaseMapSerializer(((MultisetType) type).getElementType(), new IntType());
+ return new BaseMapSerializer(((MultisetType) type).getElementType(), new IntType(), config);
case ROW:
RowType rowType = (RowType) type;
return new BaseRowSerializer(config, rowType);
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/typeutils/BaseArraySerializer.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/typeutils/BaseArraySerializer.java
index d8bf95e..c749dfa 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/typeutils/BaseArraySerializer.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/typeutils/BaseArraySerializer.java
@@ -17,6 +17,7 @@
package org.apache.flink.table.typeutils;
+import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
@@ -216,9 +217,14 @@ public class BaseArraySerializer extends TypeSerializer<BaseArray> {
return eleType.hashCode();
}
+ @VisibleForTesting
+ public TypeSerializer getEleSer() {
+ return eleSer;
+ }
+
@Override
public TypeSerializerSnapshot<BaseArray> snapshotConfiguration() {
- return new BaseArraySerializerSnapshot(eleType);
+ return new BaseArraySerializerSnapshot(eleType, eleSer);
}
/**
@@ -228,14 +234,16 @@ public class BaseArraySerializer extends TypeSerializer<BaseArray> {
private static final int CURRENT_VERSION = 3;
private LogicalType previousType;
+ private TypeSerializer previousEleSer;
@SuppressWarnings("unused")
public BaseArraySerializerSnapshot() {
// this constructor is used when restoring from a checkpoint/savepoint.
}
- BaseArraySerializerSnapshot(LogicalType eleType) {
+ BaseArraySerializerSnapshot(LogicalType eleType, TypeSerializer eleSer) {
this.previousType = eleType;
+ this.previousEleSer = eleSer;
}
@Override
@@ -245,14 +253,17 @@ public class BaseArraySerializer extends TypeSerializer<BaseArray> {
@Override
public void writeSnapshot(DataOutputView out) throws IOException {
- InstantiationUtil.serializeObject(new DataOutputViewStream(out), previousType);
+ DataOutputViewStream outStream = new DataOutputViewStream(out);
+ InstantiationUtil.serializeObject(outStream, previousType);
+ InstantiationUtil.serializeObject(outStream, previousEleSer);
}
@Override
public void readSnapshot(int readVersion, DataInputView in, ClassLoader userCodeClassLoader) throws IOException {
try {
- this.previousType = InstantiationUtil.deserializeObject(
- new DataInputViewStream(in), userCodeClassLoader);
+ DataInputViewStream inStream = new DataInputViewStream(in);
+ this.previousType = InstantiationUtil.deserializeObject(inStream, userCodeClassLoader);
+ this.previousEleSer = InstantiationUtil.deserializeObject(inStream, userCodeClassLoader);
} catch (ClassNotFoundException e) {
throw new IOException(e);
}
@@ -260,7 +271,7 @@ public class BaseArraySerializer extends TypeSerializer<BaseArray> {
@Override
public TypeSerializer<BaseArray> restoreSerializer() {
- return new BaseArraySerializer(previousType, new ExecutionConfig());
+ return new BaseArraySerializer(previousType, previousEleSer);
}
@Override
@@ -270,7 +281,8 @@ public class BaseArraySerializer extends TypeSerializer<BaseArray> {
}
BaseArraySerializer newBaseArraySerializer = (BaseArraySerializer) newSerializer;
- if (!previousType.equals(newBaseArraySerializer.eleType)) {
+ if (!previousType.equals(newBaseArraySerializer.eleType) ||
+ !previousEleSer.equals(newBaseArraySerializer.eleSer)) {
return TypeSerializerSchemaCompatibility.incompatible();
} else {
return TypeSerializerSchemaCompatibility.compatibleAsIs();
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/typeutils/BaseMapSerializer.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/typeutils/BaseMapSerializer.java
index 01dba56..766c3b5 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/typeutils/BaseMapSerializer.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/typeutils/BaseMapSerializer.java
@@ -17,6 +17,7 @@
package org.apache.flink.table.typeutils;
+import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
@@ -62,12 +63,21 @@ public class BaseMapSerializer extends TypeSerializer<BaseMap> {
private transient BinaryArrayWriter reuseKeyWriter;
private transient BinaryArrayWriter reuseValueWriter;
- public BaseMapSerializer(LogicalType keyType, LogicalType valueType) {
+ public BaseMapSerializer(LogicalType keyType, LogicalType valueType, ExecutionConfig conf) {
this.keyType = keyType;
this.valueType = valueType;
- this.keySerializer = InternalSerializers.create(keyType, new ExecutionConfig());
- this.valueSerializer = InternalSerializers.create(valueType, new ExecutionConfig());
+ this.keySerializer = InternalSerializers.create(keyType, conf);
+ this.valueSerializer = InternalSerializers.create(valueType, conf);
+ }
+
+ private BaseMapSerializer(
+ LogicalType keyType, LogicalType valueType, TypeSerializer keySerializer, TypeSerializer valueSerializer) {
+ this.keyType = keyType;
+ this.valueType = valueType;
+
+ this.keySerializer = keySerializer;
+ this.valueSerializer = valueSerializer;
}
@Override
@@ -77,7 +87,7 @@ public class BaseMapSerializer extends TypeSerializer<BaseMap> {
@Override
public TypeSerializer<BaseMap> duplicate() {
- return new BaseMapSerializer(keyType, valueType);
+ return new BaseMapSerializer(keyType, valueType, keySerializer.duplicate(), valueSerializer.duplicate());
}
@Override
@@ -217,9 +227,19 @@ public class BaseMapSerializer extends TypeSerializer<BaseMap> {
return result;
}
+ @VisibleForTesting
+ public TypeSerializer getKeySerializer() {
+ return keySerializer;
+ }
+
+ @VisibleForTesting
+ public TypeSerializer getValueSerializer() {
+ return valueSerializer;
+ }
+
@Override
public TypeSerializerSnapshot<BaseMap> snapshotConfiguration() {
- return new BaseMapSerializerSnapshot(keyType, valueType);
+ return new BaseMapSerializerSnapshot(keyType, valueType, keySerializer, valueSerializer);
}
/**
@@ -231,14 +251,20 @@ public class BaseMapSerializer extends TypeSerializer<BaseMap> {
private LogicalType previousKeyType;
private LogicalType previousValueType;
+ private TypeSerializer previousKeySerializer;
+ private TypeSerializer previousValueSerializer;
+
@SuppressWarnings("unused")
public BaseMapSerializerSnapshot() {
// this constructor is used when restoring from a checkpoint/savepoint.
}
- BaseMapSerializerSnapshot(LogicalType keyT, LogicalType valueT) {
+ BaseMapSerializerSnapshot(LogicalType keyT, LogicalType valueT, TypeSerializer keySer, TypeSerializer valueSer) {
this.previousKeyType = keyT;
this.previousValueType = valueT;
+
+ this.previousKeySerializer = keySer;
+ this.previousValueSerializer = valueSer;
}
@Override
@@ -248,15 +274,21 @@ public class BaseMapSerializer extends TypeSerializer<BaseMap> {
@Override
public void writeSnapshot(DataOutputView out) throws IOException {
- InstantiationUtil.serializeObject(new DataOutputViewStream(out), previousKeyType);
- InstantiationUtil.serializeObject(new DataOutputViewStream(out), previousValueType);
+ DataOutputViewStream outStream = new DataOutputViewStream(out);
+ InstantiationUtil.serializeObject(outStream, previousKeyType);
+ InstantiationUtil.serializeObject(outStream, previousValueType);
+ InstantiationUtil.serializeObject(outStream, previousKeySerializer);
+ InstantiationUtil.serializeObject(outStream, previousValueSerializer);
}
@Override
public void readSnapshot(int readVersion, DataInputView in, ClassLoader userCodeClassLoader) throws IOException {
try {
- this.previousKeyType = InstantiationUtil.deserializeObject(new DataInputViewStream(in), userCodeClassLoader);
- this.previousValueType = InstantiationUtil.deserializeObject(new DataInputViewStream(in), userCodeClassLoader);
+ DataInputViewStream inStream = new DataInputViewStream(in);
+ this.previousKeyType = InstantiationUtil.deserializeObject(inStream, userCodeClassLoader);
+ this.previousValueType = InstantiationUtil.deserializeObject(inStream, userCodeClassLoader);
+ this.previousKeySerializer = InstantiationUtil.deserializeObject(inStream, userCodeClassLoader);
+ this.previousValueSerializer = InstantiationUtil.deserializeObject(inStream, userCodeClassLoader);
} catch (ClassNotFoundException e) {
throw new IOException(e);
}
@@ -264,7 +296,8 @@ public class BaseMapSerializer extends TypeSerializer<BaseMap> {
@Override
public TypeSerializer<BaseMap> restoreSerializer() {
- return new BaseMapSerializer(previousKeyType, previousValueType);
+ return new BaseMapSerializer(
+ previousKeyType, previousValueType, previousKeySerializer, previousValueSerializer);
}
@Override
@@ -275,7 +308,9 @@ public class BaseMapSerializer extends TypeSerializer<BaseMap> {
BaseMapSerializer newBaseMapSerializer = (BaseMapSerializer) newSerializer;
if (!previousKeyType.equals(newBaseMapSerializer.keyType) ||
- !previousValueType.equals(newBaseMapSerializer.valueType)) {
+ !previousValueType.equals(newBaseMapSerializer.valueType) ||
+ !previousKeySerializer.equals(newBaseMapSerializer.keySerializer) ||
+ !previousValueSerializer.equals(newBaseMapSerializer.valueSerializer)) {
return TypeSerializerSchemaCompatibility.incompatible();
} else {
return TypeSerializerSchemaCompatibility.compatibleAsIs();
diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/BaseRowTest.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/BaseRowTest.java
index 43bc6a6..8723405 100644
--- a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/BaseRowTest.java
+++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/BaseRowTest.java
@@ -102,7 +102,8 @@ public class BaseRowTest {
writer.writeDecimal(10, decimal1, 5);
writer.writeDecimal(11, decimal2, 20);
writer.writeArray(12, array, new BaseArraySerializer(DataTypes.INT().getLogicalType(), null));
- writer.writeMap(13, map, new BaseMapSerializer(DataTypes.INT().getLogicalType(), DataTypes.INT().getLogicalType()));
+ writer.writeMap(13, map, new BaseMapSerializer(
+ DataTypes.INT().getLogicalType(), DataTypes.INT().getLogicalType(), null));
writer.writeRow(14, underRow, new BaseRowSerializer(null, RowType.of(new IntType(), new IntType())));
writer.writeBinary(15, bytes);
return row;
diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/BinaryArrayTest.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/BinaryArrayTest.java
index 54d0802..5063646 100644
--- a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/BinaryArrayTest.java
+++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/BinaryArrayTest.java
@@ -325,7 +325,7 @@ public class BinaryArrayTest {
BinaryArrayWriter writer = new BinaryArrayWriter(array, 2, 8);
writer.setNullAt(0);
writer.writeMap(1, BinaryMap.valueOf(subArray, subArray),
- new BaseMapSerializer(DataTypes.INT().getLogicalType(), DataTypes.INT().getLogicalType()));
+ new BaseMapSerializer(DataTypes.INT().getLogicalType(), DataTypes.INT().getLogicalType(), null));
writer.complete();
assertTrue(array.isNullAt(0));
@@ -358,7 +358,7 @@ public class BinaryArrayTest {
BinaryRow row = new BinaryRow(1);
BinaryRowWriter rowWriter = new BinaryRowWriter(row);
rowWriter.writeMap(0, binaryMap,
- new BaseMapSerializer(DataTypes.INT().getLogicalType(), DataTypes.INT().getLogicalType()));
+ new BaseMapSerializer(DataTypes.INT().getLogicalType(), DataTypes.INT().getLogicalType(), null));
rowWriter.complete();
BinaryMap map = (BinaryMap) row.getMap(0);
diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/typeutils/BaseArraySerializerTest.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/typeutils/BaseArraySerializerTest.java
index feb9893..f7f0e31 100644
--- a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/typeutils/BaseArraySerializerTest.java
+++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/typeutils/BaseArraySerializerTest.java
@@ -19,15 +19,31 @@
package org.apache.flink.table.typeutils;
import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.SerializerTestBase;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.dataformat.BaseArray;
import org.apache.flink.table.dataformat.BinaryArray;
import org.apache.flink.table.dataformat.BinaryArrayWriter;
+import org.apache.flink.table.dataformat.BinaryGeneric;
import org.apache.flink.table.dataformat.BinaryString;
import org.apache.flink.table.dataformat.GenericArray;
import org.apache.flink.testutils.DeeplyEqualsChecker;
+import org.junit.Test;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+
+import static org.apache.flink.table.typeutils.SerializerTestUtil.MyObj;
+import static org.apache.flink.table.typeutils.SerializerTestUtil.MyObjSerializer;
+import static org.apache.flink.table.typeutils.SerializerTestUtil.snapshotAndReconfigure;
+import static org.junit.Assert.assertEquals;
+
/**
* A test for the {@link BaseArraySerializer}.
*/
@@ -58,6 +74,47 @@ public class BaseArraySerializerTest extends SerializerTestBase<BaseArray> {
));
}
+ @Test
+ public void testExecutionConfigWithKryo() throws Exception {
+ // serialize base array
+ ExecutionConfig config = new ExecutionConfig();
+ config.enableForceKryo();
+ config.registerTypeWithKryoSerializer(MyObj.class, new MyObjSerializer());
+ final BaseArraySerializer serializer = createSerializerWithConfig(config);
+
+ MyObj inputObj = new MyObj(114514, 1919810);
+ BaseArray inputArray = new GenericArray(new BinaryGeneric[] {
+ new BinaryGeneric<>(inputObj, new KryoSerializer<>(MyObj.class, config))
+ }, 1);
+
+ byte[] serialized;
+ try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
+ serializer.serialize(inputArray, new DataOutputViewStreamWrapper(out));
+ serialized = out.toByteArray();
+ }
+
+ // deserialize base array using restored serializer
+ final BaseArraySerializer restoreSerializer =
+ (BaseArraySerializer) snapshotAndReconfigure(serializer, () -> createSerializerWithConfig(config));
+
+ BaseArray outputArray;
+ try (ByteArrayInputStream in = new ByteArrayInputStream(serialized)) {
+ outputArray = restoreSerializer.deserialize(new DataInputViewStreamWrapper(in));
+ }
+
+ TypeSerializer restoreEleSer = restoreSerializer.getEleSer();
+ assertEquals(serializer.getEleSer(), restoreEleSer);
+
+ MyObj outputObj = BinaryGeneric.getJavaObjectFromBinaryGeneric(
+ outputArray.getGeneric(0), new KryoSerializer<>(MyObj.class, config));
+ assertEquals(inputObj, outputObj);
+ }
+
+ private BaseArraySerializer createSerializerWithConfig(ExecutionConfig config) {
+ return new BaseArraySerializer(
+ DataTypes.ANY(TypeInformation.of(MyObj.class)).getLogicalType(), config);
+ }
+
@Override
protected BaseArraySerializer createSerializer() {
return new BaseArraySerializer(DataTypes.STRING().getLogicalType(), new ExecutionConfig());
diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/typeutils/BaseMapSerializerTest.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/typeutils/BaseMapSerializerTest.java
index 38216dc..6c7710b 100644
--- a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/typeutils/BaseMapSerializerTest.java
+++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/typeutils/BaseMapSerializerTest.java
@@ -18,20 +18,36 @@
package org.apache.flink.table.typeutils;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.SerializerTestBase;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.dataformat.BaseMap;
import org.apache.flink.table.dataformat.BinaryArray;
import org.apache.flink.table.dataformat.BinaryArrayWriter;
+import org.apache.flink.table.dataformat.BinaryGeneric;
import org.apache.flink.table.dataformat.BinaryMap;
import org.apache.flink.table.dataformat.BinaryString;
import org.apache.flink.table.dataformat.GenericMap;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.testutils.DeeplyEqualsChecker;
+import org.junit.Test;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
import java.util.HashMap;
import java.util.Map;
+import static org.apache.flink.table.typeutils.SerializerTestUtil.MyObj;
+import static org.apache.flink.table.typeutils.SerializerTestUtil.MyObjSerializer;
+import static org.apache.flink.table.typeutils.SerializerTestUtil.snapshotAndReconfigure;
+import static org.junit.Assert.assertEquals;
+
/**
* A test for the {@link BaseMapSerializer}.
*/
@@ -55,8 +71,56 @@ public class BaseMapSerializerTest extends SerializerTestBase<BaseMap> {
));
}
+ @Test
+ public void testExecutionConfigWithKryo() throws Exception {
+ // serialize base array
+ ExecutionConfig config = new ExecutionConfig();
+ config.enableForceKryo();
+ config.registerTypeWithKryoSerializer(MyObj.class, new MyObjSerializer());
+ final BaseMapSerializer serializer = createSerializerWithConfig(config);
+
+ int inputKey = 998244353;
+ MyObj inputObj = new MyObj(114514, 1919810);
+ Map<Object, Object> javaMap = new HashMap<>();
+ javaMap.put(inputKey, new BinaryGeneric<>(inputObj, new KryoSerializer<>(MyObj.class, config)));
+ BaseMap inputMap = new GenericMap(javaMap);
+
+ byte[] serialized;
+ try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
+ serializer.serialize(inputMap, new DataOutputViewStreamWrapper(out));
+ serialized = out.toByteArray();
+ }
+
+ // deserialize base array using restored serializer
+ final BaseMapSerializer restoreSerializer =
+ (BaseMapSerializer) snapshotAndReconfigure(serializer, () -> createSerializerWithConfig(config));
+
+ BaseMap outputMap;
+ try (ByteArrayInputStream in = new ByteArrayInputStream(serialized)) {
+ outputMap = restoreSerializer.deserialize(new DataInputViewStreamWrapper(in));
+ }
+
+ TypeSerializer restoreKeySer = restoreSerializer.getKeySerializer();
+ TypeSerializer restoreValueSer = restoreSerializer.getValueSerializer();
+ assertEquals(serializer.getKeySerializer(), restoreKeySer);
+ assertEquals(serializer.getValueSerializer(), restoreValueSer);
+
+ MyObj outputObj = BinaryGeneric.getJavaObjectFromBinaryGeneric(
+ (BinaryGeneric) outputMap.toJavaMap(
+ DataTypes.INT().getLogicalType(), DataTypes.ANY(TypeInformation.of(MyObj.class)).getLogicalType())
+ .get(inputKey), new KryoSerializer<>(MyObj.class, config));
+ assertEquals(inputObj, outputObj);
+ }
+
+ private BaseMapSerializer createSerializerWithConfig(ExecutionConfig config) {
+ return new BaseMapSerializer(
+ DataTypes.INT().getLogicalType(),
+ DataTypes.ANY(TypeInformation.of(MyObj.class)).getLogicalType(),
+ config);
+ }
+
private static BaseMapSerializer newSer() {
- return new BaseMapSerializer(INT, STRING);
+ return new BaseMapSerializer(INT, STRING, new ExecutionConfig());
}
@Override
diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/typeutils/SerializerTestUtil.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/typeutils/SerializerTestUtil.java
new file mode 100644
index 0000000..0587e07
--- /dev/null
+++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/typeutils/SerializerTestUtil.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.typeutils;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.Serializable;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Utils for testing serializers.
+ */
+public class SerializerTestUtil {
+
+ /**
+ * Snapshot and restore the given serializer. Returns the restored serializer.
+ */
+ public static <T> TypeSerializer<T> snapshotAndReconfigure(
+ TypeSerializer<T> serializer, SerializerGetter<T> serializerGetter) throws IOException {
+ TypeSerializerSnapshot<T> configSnapshot = serializer.snapshotConfiguration();
+
+ byte[] serializedConfig;
+ try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
+ TypeSerializerSnapshotSerializationUtil.writeSerializerSnapshot(
+ new DataOutputViewStreamWrapper(out), configSnapshot, serializer);
+ serializedConfig = out.toByteArray();
+ }
+
+ TypeSerializerSnapshot<T> restoredConfig;
+ try (ByteArrayInputStream in = new ByteArrayInputStream(serializedConfig)) {
+ restoredConfig = TypeSerializerSnapshotSerializationUtil.readSerializerSnapshot(
+ new DataInputViewStreamWrapper(in),
+ Thread.currentThread().getContextClassLoader(),
+ serializerGetter.getSerializer());
+ }
+
+ TypeSerializerSchemaCompatibility<T> strategy =
+ restoredConfig.resolveSchemaCompatibility(serializerGetter.getSerializer());
+ final TypeSerializer<T> restoredSerializer;
+ if (strategy.isCompatibleAsIs()) {
+ restoredSerializer = restoredConfig.restoreSerializer();
+ }
+ else if (strategy.isCompatibleWithReconfiguredSerializer()) {
+ restoredSerializer = strategy.getReconfiguredSerializer();
+ }
+ else {
+ throw new AssertionError("Unable to restore serializer with " + strategy);
+ }
+ assertEquals(serializer.getClass(), restoredSerializer.getClass());
+
+ return restoredSerializer;
+ }
+
+ /**
+ * Used for snapshotAndReconfigure method to provide serializers when restoring.
+ */
+ public interface SerializerGetter<T> {
+ TypeSerializer<T> getSerializer();
+ }
+
+ /**
+ * A simple POJO.
+ */
+ public static class MyObj {
+ private int a;
+ private int b;
+
+ MyObj(int a, int b) {
+ this.a = a;
+ this.b = b;
+ }
+
+ int getA() {
+ return a;
+ }
+
+ int getB() {
+ return b;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ return o instanceof MyObj && ((MyObj) o).a == a && ((MyObj) o).b == b;
+ }
+ }
+
+ /**
+ * Kryo serializer for POJO.
+ */
+ public static class MyObjSerializer extends Serializer<MyObj> implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+ private static final int delta = 7;
+
+ @Override
+ public void write(Kryo kryo, Output output, MyObj myObj) {
+ output.writeInt(myObj.getA() + delta);
+ output.writeInt(myObj.getB() + delta);
+ }
+
+ @Override
+ public MyObj read(Kryo kryo, Input input, Class<MyObj> aClass) {
+ int a = input.readInt() - delta;
+ int b = input.readInt() - delta;
+ return new MyObj(a, b);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ return o instanceof MyObjSerializer;
+ }
+ }
+}