You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2017/05/07 19:47:06 UTC

[8/8] flink git commit: [FLINK-6178] [core] Introduce TypeDeserializer interface for CompatibilityResult

[FLINK-6178] [core] Introduce TypeDeserializer interface for CompatibilityResult

Previously, the CompatibilityResult class accepts a full-blown
TypeSerializer for its convert deserializer, which will actually only
ever be used for deserialization.

This commit narrows down the interface by introducing a new
TypeDeserializer interface that contains only the read methods.

This closes #3834.
This closes #3804.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/63c04a51
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/63c04a51
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/63c04a51

Branch: refs/heads/master
Commit: 63c04a516f40ec2dca4d8edef58e7c2ef563ce67
Parents: 8aa5e05
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
Authored: Mon May 8 02:42:02 2017 +0800
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Mon May 8 03:44:32 2017 +0800

----------------------------------------------------------------------
 .../common/typeutils/CompatibilityResult.java   |   8 +-
 .../api/common/typeutils/TypeDeserializer.java  |  88 +++++++++++++
 .../typeutils/TypeDeserializerAdapter.java      | 127 +++++++++++++++++++
 .../api/common/typeutils/TypeSerializer.java    |   4 +-
 .../typeutils/base/GenericArraySerializer.java  |   3 +-
 .../common/typeutils/base/ListSerializer.java   |   3 +-
 .../common/typeutils/base/MapSerializer.java    |   5 +-
 .../typeutils/runtime/EitherSerializer.java     |   5 +-
 .../java/typeutils/runtime/RowSerializer.java   |   4 +-
 .../AbstractKeyedCEPPatternOperator.java        |   4 +-
 .../table/runtime/types/CRowSerializer.scala    |   3 +-
 .../runtime/state/ArrayListSerializer.java      |   3 +-
 .../flink/runtime/state/HashMapSerializer.java  |   5 +-
 .../api/scala/typeutils/EitherSerializer.scala  |   6 +-
 .../api/scala/typeutils/OptionSerializer.scala  |   3 +-
 .../MultiplexingStreamRecordSerializer.java     |   4 +-
 .../streamrecord/StreamRecordSerializer.java    |   4 +-
 .../streamrecord/StreamElementSerializer.java   |   4 +-
 18 files changed, 258 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/63c04a51/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompatibilityResult.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompatibilityResult.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompatibilityResult.java
index cfbb516..891cfe0 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompatibilityResult.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompatibilityResult.java
@@ -38,7 +38,7 @@ public final class CompatibilityResult<T> {
 	 *
 	 * <p>This is only relevant if migration is required.
 	 */
-	private final TypeSerializer<T> convertDeserializer;
+	private final TypeDeserializer<T> convertDeserializer;
 
 	/**
 	 * Returns a strategy that signals that the new serializer is compatible and no migration is required.
@@ -61,16 +61,16 @@ public final class CompatibilityResult<T> {
 	 *
 	 * @return a result that signals migration is necessary, possibly providing a convert deserializer.
 	 */
-	public static <T> CompatibilityResult<T> requiresMigration(TypeSerializer<T> convertDeserializer) {
+	public static <T> CompatibilityResult<T> requiresMigration(TypeDeserializer<T> convertDeserializer) {
 		return new CompatibilityResult<>(true, convertDeserializer);
 	}
 
-	private CompatibilityResult(boolean requiresMigration, TypeSerializer<T> convertDeserializer) {
+	private CompatibilityResult(boolean requiresMigration, TypeDeserializer<T> convertDeserializer) {
 		this.requiresMigration = requiresMigration;
 		this.convertDeserializer = convertDeserializer;
 	}
 
-	public TypeSerializer<T> getConvertDeserializer() {
+	public TypeDeserializer<T> getConvertDeserializer() {
 		return convertDeserializer;
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/63c04a51/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeDeserializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeDeserializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeDeserializer.java
new file mode 100644
index 0000000..2ec064a
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeDeserializer.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils;
+
+import org.apache.flink.core.memory.DataInputView;
+
+import java.io.IOException;
+
+/**
+ * This interface describes the methods that are required for a data type to be read by the Flink runtime.
+ * Specifically, this interface contains the deserialization methods. In contrast, the {@link TypeSerializer}
+ * interface contains the complete set of methods for both serialization and deserialization.
+ *
+ * <p>The methods in this class are assumed to be stateless, such that it is effectively thread safe. Stateful
+ * implementations of the methods may lead to unpredictable side effects and will compromise both stability and
+ * correctness of the program.
+ *
+ * @param <T> The data type that the deserializer deserializes.
+ */
+public interface TypeDeserializer<T> {
+
+	/**
+	 * Creates a deep copy of this deserializer if it is necessary, i.e. if it is stateful. This
+	 * can return itself if the serializer is not stateful.
+	 *
+	 * We need this because deserializers might be used in several threads. Stateless deserializers
+	 * are inherently thread-safe while stateful deserializers might not be thread-safe.
+	 */
+	TypeSerializer<T> duplicate();
+
+	/**
+	 * De-serializes a record from the given source input view.
+	 *
+	 * @param source The input view from which to read the data.
+	 * @return The deserialized element.
+	 *
+	 * @throws IOException Thrown, if the de-serialization encountered an I/O related error. Typically raised by the
+	 *                     input view, which may have an underlying I/O channel from which it reads.
+	 */
+	T deserialize(DataInputView source) throws IOException;
+
+	/**
+	 * De-serializes a record from the given source input view into the given reuse record instance if mutable.
+	 *
+	 * @param reuse The record instance into which to de-serialize the data.
+	 * @param source The input view from which to read the data.
+	 * @return The deserialized element.
+	 *
+	 * @throws IOException Thrown, if the de-serialization encountered an I/O related error. Typically raised by the
+	 *                     input view, which may have an underlying I/O channel from which it reads.
+	 */
+	T deserialize(T reuse, DataInputView source) throws IOException;
+
+	/**
+	 * Gets the length of the data type, if it is a fix length data type.
+	 *
+	 * @return The length of the data type, or <code>-1</code> for variable length data types.
+	 */
+	int getLength();
+
+	/**
+	 * Returns true if the given object can be equaled with this object. If not, it returns false.
+	 *
+	 * @param obj Object which wants to take part in the equality relation
+	 * @return true if obj can be equaled with this, otherwise false
+	 */
+	boolean canEqual(Object obj);
+
+	boolean equals(Object obj);
+
+	int hashCode();
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/63c04a51/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeDeserializerAdapter.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeDeserializerAdapter.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeDeserializerAdapter.java
new file mode 100644
index 0000000..e02bed4
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeDeserializerAdapter.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils;
+
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+
+/**
+ * A utility class that wraps a {@link TypeDeserializer} as a {@link TypeSerializer}.
+ *
+ * <p>Methods related to deserialization are directly forwarded to the wrapped deserializer,
+ * while serialization methods are masked and not intended for use.
+ *
+ * @param <T> The data type that the deserializer deserializes.
+ */
+public final class TypeDeserializerAdapter<T> extends TypeSerializer<T> {
+
+	private static final long serialVersionUID = 1L;
+
+	/** The actual wrapped deserializer instance */
+	private final TypeDeserializer<T> deserializer;
+
+	/**
+	 * Creates a {@link TypeSerializer} that wraps a {@link TypeDeserializer}.
+	 *
+	 * @param deserializer the actual deserializer to wrap.
+	 */
+	public TypeDeserializerAdapter(TypeDeserializer<T> deserializer) {
+		this.deserializer = Preconditions.checkNotNull(deserializer);
+	}
+
+	// --------------------------------------------------------------------------------------------
+	// Forwarded deserialization related methods
+	// --------------------------------------------------------------------------------------------
+
+	public T deserialize(DataInputView source) throws IOException {
+		return deserializer.deserialize(source);
+	}
+
+	public T deserialize(T reuse, DataInputView source) throws IOException {
+		return deserializer.deserialize(reuse, source);
+	}
+
+	public TypeSerializer<T> duplicate() {
+		return deserializer.duplicate();
+	}
+
+	public int getLength() {
+		return deserializer.getLength();
+	}
+
+	public boolean equals(Object obj) {
+		return deserializer.equals(obj);
+	}
+
+	public boolean canEqual(Object obj) {
+		return deserializer.canEqual(obj);
+	}
+
+	public int hashCode() {
+		return deserializer.hashCode();
+	}
+
+	// --------------------------------------------------------------------------------------------
+	// Irrelevant methods not intended for use
+	// --------------------------------------------------------------------------------------------
+
+	public boolean isImmutableType() {
+		throw new UnsupportedOperationException(
+			"This is a TypeDeserializerAdapter used only for deserialization; this method should not be used.");
+	}
+
+	public T createInstance() {
+		throw new UnsupportedOperationException(
+			"This is a TypeDeserializerAdapter used only for deserialization; this method should not be used.");
+	}
+
+	public T copy(T from) {
+		throw new UnsupportedOperationException(
+			"This is a TypeDeserializerAdapter used only for deserialization; this method should not be used.");
+	}
+
+	public T copy(T from, T reuse) {
+		throw new UnsupportedOperationException(
+			"This is a TypeDeserializerAdapter used only for deserialization; this method should not be used.");
+	}
+
+	public void serialize(T record, DataOutputView target) throws IOException {
+		throw new UnsupportedOperationException(
+			"This is a TypeDeserializerAdapter used only for deserialization; this method should not be used.");
+	}
+
+	public void copy(DataInputView source, DataOutputView target) throws IOException {
+		throw new UnsupportedOperationException(
+			"This is a TypeDeserializerAdapter used only for deserialization; this method should not be used.");
+	}
+
+	public TypeSerializerConfigSnapshot snapshotConfiguration() {
+		throw new UnsupportedOperationException(
+			"This is a TypeDeserializerAdapter used only for deserialization; this method should not be used.");
+	}
+
+	public CompatibilityResult<T> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
+		throw new UnsupportedOperationException(
+			"This is a TypeDeserializerAdapter used only for deserialization; this method should not be used.");
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/63c04a51/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java
index f0562d4..0b5a08a 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java
@@ -36,7 +36,7 @@ import java.io.Serializable;
  * @param <T> The data type that the serializer serializes.
  */
 @PublicEvolving
-public abstract class TypeSerializer<T> implements Serializable {
+public abstract class TypeSerializer<T> implements TypeDeserializer<T>, Serializable {
 	
 	private static final long serialVersionUID = 1L;
 
@@ -197,7 +197,7 @@ public abstract class TypeSerializer<T> implements Serializable {
 	 *     has been reconfigured to be compatible, to continue reading previous data, and that the
 	 *     serialization schema remains the same. No migration needs to be performed.</li>
 	 *
-	 *     <li>{@link CompatibilityResult#requiresMigration(TypeSerializer)}: this signals Flink that
+	 *     <li>{@link CompatibilityResult#requiresMigration(TypeDeserializer)}: this signals Flink that
 	 *     migration needs to be performed, because this serializer is not compatible, or cannot be reconfigured to be
 	 *     compatible, for previous data. Furthermore, in the case that the preceding serializer cannot be found or
 	 *     restored to read the previous data to perform the migration, the provided convert deserializer can be

http://git-wip-us.apache.org/repos/asf/flink/blob/63c04a51/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializer.java
index fe61ab3..3e592b4 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializer.java
@@ -23,6 +23,7 @@ import java.lang.reflect.Array;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.typeutils.CompatibilityResult;
+import org.apache.flink.api.common.typeutils.TypeDeserializerAdapter;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
 import org.apache.flink.core.memory.DataInputView;
@@ -213,7 +214,7 @@ public final class GenericArraySerializer<C> extends TypeSerializer<C[]> {
 					return CompatibilityResult.requiresMigration(
 						new GenericArraySerializer<>(
 							componentClass,
-							compatResult.getConvertDeserializer()));
+							new TypeDeserializerAdapter<>(compatResult.getConvertDeserializer())));
 				}
 			}
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/63c04a51/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ListSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ListSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ListSerializer.java
index 02d22de..1b6540c 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ListSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ListSerializer.java
@@ -20,6 +20,7 @@ package org.apache.flink.api.common.typeutils.base;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.typeutils.CompatibilityResult;
+import org.apache.flink.api.common.typeutils.TypeDeserializerAdapter;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
 import org.apache.flink.core.memory.DataInputView;
@@ -188,7 +189,7 @@ public final class ListSerializer<T> extends TypeSerializer<List<T>> {
 				return CompatibilityResult.compatible();
 			} else if (compatResult.getConvertDeserializer() != null) {
 				return CompatibilityResult.requiresMigration(
-					new ListSerializer<>(compatResult.getConvertDeserializer()));
+					new ListSerializer<>(new TypeDeserializerAdapter<>(compatResult.getConvertDeserializer())));
 			}
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/63c04a51/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MapSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MapSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MapSerializer.java
index 50900e4..182fff6 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MapSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MapSerializer.java
@@ -20,6 +20,7 @@ package org.apache.flink.api.common.typeutils.base;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.typeutils.CompatibilityResult;
+import org.apache.flink.api.common.typeutils.TypeDeserializerAdapter;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
 import org.apache.flink.core.memory.DataInputView;
@@ -220,8 +221,8 @@ public final class MapSerializer<K, V> extends TypeSerializer<Map<K, V>> {
 			} else if (keyCompatResult.getConvertDeserializer() != null && valueCompatResult.getConvertDeserializer() != null) {
 				return CompatibilityResult.requiresMigration(
 					new MapSerializer<>(
-						keyCompatResult.getConvertDeserializer(),
-						valueCompatResult.getConvertDeserializer()));
+						new TypeDeserializerAdapter<>(keyCompatResult.getConvertDeserializer()),
+						new TypeDeserializerAdapter<>(valueCompatResult.getConvertDeserializer())));
 			}
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/63c04a51/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializer.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializer.java
index c025d61..461dd87 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializer.java
@@ -20,6 +20,7 @@ package org.apache.flink.api.java.typeutils.runtime;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.typeutils.CompatibilityResult;
+import org.apache.flink.api.common.typeutils.TypeDeserializerAdapter;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
 import org.apache.flink.core.memory.DataInputView;
@@ -212,8 +213,8 @@ public class EitherSerializer<L, R> extends TypeSerializer<Either<L, R>> {
 				if (leftCompatResult.getConvertDeserializer() != null && rightCompatResult.getConvertDeserializer() != null) {
 					return CompatibilityResult.requiresMigration(
 						new EitherSerializer<>(
-							leftCompatResult.getConvertDeserializer(),
-							rightCompatResult.getConvertDeserializer()));
+							new TypeDeserializerAdapter<>(leftCompatResult.getConvertDeserializer()),
+							new TypeDeserializerAdapter<>(rightCompatResult.getConvertDeserializer())));
 				}
 			}
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/63c04a51/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowSerializer.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowSerializer.java
index 5770dac..075c9d3 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowSerializer.java
@@ -20,6 +20,7 @@ package org.apache.flink.api.java.typeutils.runtime;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.typeutils.CompatibilityResult;
 import org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.TypeDeserializerAdapter;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
 import org.apache.flink.api.common.typeutils.TypeSerializerUtil;
@@ -276,7 +277,8 @@ public final class RowSerializer extends TypeSerializer<Row> {
 							// one of the field serializers cannot provide a fallback deserializer
 							return CompatibilityResult.requiresMigration(null);
 						} else {
-							convertDeserializers[i] = compatResult.getConvertDeserializer();
+							convertDeserializers[i] =
+								new TypeDeserializerAdapter<>(compatResult.getConvertDeserializer());
 						}
 					}
 				}

http://git-wip-us.apache.org/repos/asf/flink/blob/63c04a51/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
index 14235dc..140e091 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
@@ -22,6 +22,7 @@ import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.state.ValueState;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.typeutils.CompatibilityResult;
+import org.apache.flink.api.common.typeutils.TypeDeserializerAdapter;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
 import org.apache.flink.api.common.typeutils.base.CollectionSerializerConfigSnapshot;
@@ -516,7 +517,8 @@ public abstract class AbstractKeyedCEPPatternOperator<IN, KEY, OUT>
 					return CompatibilityResult.compatible();
 				} else if (compatResult.getConvertDeserializer() != null) {
 					return CompatibilityResult.requiresMigration(
-						new PriorityQueueSerializer<>(compatResult.getConvertDeserializer(), factory));
+						new PriorityQueueSerializer<>(
+							new TypeDeserializerAdapter<>(compatResult.getConvertDeserializer()), factory));
 				}
 			}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/63c04a51/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/types/CRowSerializer.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/types/CRowSerializer.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/types/CRowSerializer.scala
index 7ffa57c..122f4fb 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/types/CRowSerializer.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/types/CRowSerializer.scala
@@ -96,7 +96,8 @@ class CRowSerializer(val rowSerializer: TypeSerializer[Row]) extends TypeSeriali
         if (compatResult.requiresMigration()) {
           if (compatResult.getConvertDeserializer != null) {
             CompatibilityResult.requiresMigration(
-              new CRowSerializer(compatResult.getConvertDeserializer)
+              new CRowSerializer(
+                new TypeDeserializerAdapter(compatResult.getConvertDeserializer))
             )
           } else {
             CompatibilityResult.requiresMigration(null)

http://git-wip-us.apache.org/repos/asf/flink/blob/63c04a51/flink-runtime/src/main/java/org/apache/flink/runtime/state/ArrayListSerializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ArrayListSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ArrayListSerializer.java
index 8fbc227..c39cb9b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ArrayListSerializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ArrayListSerializer.java
@@ -18,6 +18,7 @@
 package org.apache.flink.runtime.state;
 
 import org.apache.flink.api.common.typeutils.CompatibilityResult;
+import org.apache.flink.api.common.typeutils.TypeDeserializerAdapter;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
 import org.apache.flink.api.common.typeutils.base.CollectionSerializerConfigSnapshot;
@@ -155,7 +156,7 @@ final public class ArrayListSerializer<T> extends TypeSerializer<ArrayList<T>> {
 				return CompatibilityResult.compatible();
 			} else if (compatResult.getConvertDeserializer() != null) {
 				return CompatibilityResult.requiresMigration(
-					new ArrayListSerializer<>(compatResult.getConvertDeserializer()));
+					new ArrayListSerializer<>(new TypeDeserializerAdapter<>(compatResult.getConvertDeserializer())));
 			}
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/63c04a51/flink-runtime/src/main/java/org/apache/flink/runtime/state/HashMapSerializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/HashMapSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/HashMapSerializer.java
index d52c207..925fe78 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/HashMapSerializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/HashMapSerializer.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.state;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.typeutils.CompatibilityResult;
+import org.apache.flink.api.common.typeutils.TypeDeserializerAdapter;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
 import org.apache.flink.api.common.typeutils.base.MapSerializerConfigSnapshot;
@@ -221,8 +222,8 @@ public final class HashMapSerializer<K, V> extends TypeSerializer<HashMap<K, V>>
 			} else if (keyCompatResult.getConvertDeserializer() != null && valueCompatResult.getConvertDeserializer() != null) {
 				return CompatibilityResult.requiresMigration(
 					new HashMapSerializer<>(
-						keyCompatResult.getConvertDeserializer(),
-						valueCompatResult.getConvertDeserializer()));
+						new TypeDeserializerAdapter<>(keyCompatResult.getConvertDeserializer()),
+						new TypeDeserializerAdapter<>(valueCompatResult.getConvertDeserializer())));
 			}
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/63c04a51/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherSerializer.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherSerializer.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherSerializer.scala
index 468fddc..88b2041 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherSerializer.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherSerializer.scala
@@ -18,7 +18,7 @@
 package org.apache.flink.api.scala.typeutils
 
 import org.apache.flink.annotation.Internal
-import org.apache.flink.api.common.typeutils.{CompatibilityResult, TypeSerializer, TypeSerializerConfigSnapshot}
+import org.apache.flink.api.common.typeutils.{CompatibilityResult, TypeDeserializerAdapter, TypeSerializer, TypeSerializerConfigSnapshot}
 import org.apache.flink.api.java.typeutils.runtime.EitherSerializerConfigSnapshot
 import org.apache.flink.core.memory.{DataInputView, DataOutputView}
 
@@ -133,8 +133,8 @@ class EitherSerializer[A, B, T <: Either[A, B]](
 
             CompatibilityResult.requiresMigration(
               new EitherSerializer[A, B, T](
-                leftCompatResult.getConvertDeserializer,
-                rightCompatResult.getConvertDeserializer
+                new TypeDeserializerAdapter(leftCompatResult.getConvertDeserializer),
+                new TypeDeserializerAdapter(rightCompatResult.getConvertDeserializer)
               )
             )
 

http://git-wip-us.apache.org/repos/asf/flink/blob/63c04a51/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionSerializer.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionSerializer.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionSerializer.scala
index d2bb098..81b3bcc 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionSerializer.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionSerializer.scala
@@ -114,7 +114,8 @@ class OptionSerializer[A](val elemSerializer: TypeSerializer[A])
         if (compatResult.requiresMigration()) {
           if (compatResult.getConvertDeserializer != null) {
             CompatibilityResult.requiresMigration(
-              new OptionSerializer[A](compatResult.getConvertDeserializer))
+              new OptionSerializer[A](
+                new TypeDeserializerAdapter(compatResult.getConvertDeserializer)))
           } else {
             CompatibilityResult.requiresMigration(null)
           }

http://git-wip-us.apache.org/repos/asf/flink/blob/63c04a51/flink-streaming-java/src/main/java/org/apache/flink/migration/streaming/runtime/streamrecord/MultiplexingStreamRecordSerializer.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/migration/streaming/runtime/streamrecord/MultiplexingStreamRecordSerializer.java b/flink-streaming-java/src/main/java/org/apache/flink/migration/streaming/runtime/streamrecord/MultiplexingStreamRecordSerializer.java
index 53fea46..552ffd0 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/migration/streaming/runtime/streamrecord/MultiplexingStreamRecordSerializer.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/migration/streaming/runtime/streamrecord/MultiplexingStreamRecordSerializer.java
@@ -24,6 +24,7 @@ import java.io.IOException;
 
 import org.apache.flink.api.common.typeutils.CompatibilityResult;
 import org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.TypeDeserializerAdapter;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
 import org.apache.flink.core.memory.DataInputView;
@@ -228,7 +229,8 @@ public class MultiplexingStreamRecordSerializer<T> extends TypeSerializer<Stream
 				return CompatibilityResult.compatible();
 			} else if (compatResult.getConvertDeserializer() != null) {
 				return CompatibilityResult.requiresMigration(
-					new MultiplexingStreamRecordSerializer<>(compatResult.getConvertDeserializer()));
+					new MultiplexingStreamRecordSerializer<>(
+						new TypeDeserializerAdapter<>(compatResult.getConvertDeserializer())));
 			}
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/63c04a51/flink-streaming-java/src/main/java/org/apache/flink/migration/streaming/runtime/streamrecord/StreamRecordSerializer.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/migration/streaming/runtime/streamrecord/StreamRecordSerializer.java b/flink-streaming-java/src/main/java/org/apache/flink/migration/streaming/runtime/streamrecord/StreamRecordSerializer.java
index 2a87f4e..f7a661e 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/migration/streaming/runtime/streamrecord/StreamRecordSerializer.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/migration/streaming/runtime/streamrecord/StreamRecordSerializer.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.typeutils.CompatibilityResult;
 import org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.TypeDeserializerAdapter;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
 import org.apache.flink.core.memory.DataInputView;
@@ -167,7 +168,8 @@ public final class StreamRecordSerializer<T> extends TypeSerializer<StreamRecord
 				return CompatibilityResult.requiresMigration(null);
 			} else if (compatResult.getConvertDeserializer() != null) {
 				return CompatibilityResult.requiresMigration(
-					new StreamRecordSerializer<>(compatResult.getConvertDeserializer()));
+					new StreamRecordSerializer<>(
+						new TypeDeserializerAdapter<>(compatResult.getConvertDeserializer())));
 			}
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/63c04a51/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializer.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializer.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializer.java
index 5c52fa6..e444ced 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializer.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializer.java
@@ -24,6 +24,7 @@ import java.io.IOException;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.typeutils.CompatibilityResult;
 import org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.TypeDeserializerAdapter;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
 import org.apache.flink.core.memory.DataInputView;
@@ -289,7 +290,8 @@ public final class StreamElementSerializer<T> extends TypeSerializer<StreamEleme
 				return CompatibilityResult.compatible();
 			} else if (compatResult.getConvertDeserializer() != null) {
 				return CompatibilityResult.requiresMigration(
-					new StreamElementSerializer<>(compatResult.getConvertDeserializer()));
+					new StreamElementSerializer<>(
+						new TypeDeserializerAdapter<>(compatResult.getConvertDeserializer())));
 			}
 		}