You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2017/05/22 15:30:01 UTC

[3/3] flink git commit: [FLINK-6482] [core] Add nested serializers to config snapshots of composite serializers

[FLINK-6482] [core] Add nested serializers to config snapshots of composite serializers

This commit adds also the nested serializers themselves to the
configuration snapshots of composite serializers. This opens up the
oppurtunity to use the previous nested serializer as the convert
deserializer in the case that a nested serializer in the new serializer
determines that state migration is required.

This commit also consolidate all TypeSerializer-related serialization
proxies into a single utility class.

This closes #3937.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a7bc5de9
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a7bc5de9
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a7bc5de9

Branch: refs/heads/master
Commit: a7bc5de9b17dde793c6da0e7cb700004af117148
Parents: 5624c70
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
Authored: Thu May 18 14:51:33 2017 +0800
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Mon May 22 23:24:26 2017 +0800

----------------------------------------------------------------------
 .../state/RocksDBKeyedStateBackend.java         |  19 +-
 .../api/common/typeutils/CompatibilityUtil.java |  83 ++++
 .../CompositeTypeSerializerConfigSnapshot.java  |  45 +-
 .../TypeSerializerSerializationProxy.java       | 244 ----------
 .../TypeSerializerSerializationUtil.java        | 446 +++++++++++++++++++
 .../common/typeutils/TypeSerializerUtil.java    | 203 ---------
 .../UnloadableDummyTypeSerializer.java          | 130 ++++++
 .../CollectionSerializerConfigSnapshot.java     |  10 +-
 .../typeutils/base/GenericArraySerializer.java  |  15 +-
 .../GenericArraySerializerConfigSnapshot.java   |   6 +-
 .../common/typeutils/base/ListSerializer.java   |  15 +-
 .../common/typeutils/base/MapSerializer.java    |  27 +-
 .../base/MapSerializerConfigSnapshot.java       |  11 +-
 .../typeutils/runtime/EitherSerializer.java     |  27 +-
 .../runtime/EitherSerializerConfigSnapshot.java |  11 +-
 .../java/typeutils/runtime/PojoSerializer.java  | 397 ++++++++++++-----
 .../java/typeutils/runtime/RowSerializer.java   |  29 +-
 .../java/typeutils/runtime/TupleSerializer.java |   5 +
 .../typeutils/runtime/TupleSerializerBase.java  |  52 ++-
 .../runtime/TupleSerializerConfigSnapshot.java  |   9 +-
 .../common/typeutils/SerializerTestBase.java    |   4 +-
 .../TypeSerializerConfigSnapshotTest.java       | 147 ------
 .../TypeSerializerSerializationProxyTest.java   | 142 ------
 .../TypeSerializerSerializationUtilTest.java    | 295 ++++++++++++
 .../typeutils/base/EnumSerializerTest.java      |   6 +-
 .../typeutils/runtime/PojoSerializerTest.java   | 146 +++++-
 .../kryo/KryoSerializerCompatibilityTest.java   |  10 +-
 .../main/java/org/apache/flink/cep/nfa/NFA.java |  41 +-
 .../org/apache/flink/cep/nfa/SharedBuffer.java  |  48 +-
 .../AbstractKeyedCEPPatternOperator.java        |  15 +-
 .../table/runtime/types/CRowSerializer.scala    |  14 +-
 .../runtime/state/ArrayListSerializer.java      |  15 +-
 .../state/DefaultOperatorStateBackend.java      |   5 +-
 .../flink/runtime/state/HashMapSerializer.java  |  27 +-
 .../state/KeyedBackendSerializationProxy.java   |  58 +--
 ...ckendStateMetaInfoSnapshotReaderWriters.java | 108 ++---
 ...ckendStateMetaInfoSnapshotReaderWriters.java |  63 +--
 .../flink/runtime/state/StateMigrationUtil.java |  83 ----
 .../state/heap/HeapKeyedStateBackend.java       |  17 +-
 .../runtime/state/MemoryStateBackendTest.java   |  14 +-
 .../runtime/state/OperatorStateBackendTest.java |   9 +-
 .../runtime/state/SerializationProxiesTest.java |  22 +-
 .../api/scala/codegen/TypeInformationGen.scala  |  11 +
 .../org/apache/flink/api/scala/package.scala    |  22 +-
 .../api/scala/typeutils/EitherSerializer.scala  |  29 +-
 .../api/scala/typeutils/OptionSerializer.scala  |  20 +-
 .../api/scala/typeutils/TrySerializer.scala     |  36 +-
 .../MultiplexingStreamRecordSerializer.java     |  21 +-
 .../streamrecord/StreamRecordSerializer.java    |  21 +-
 .../streamrecord/StreamElementSerializer.java   |  21 +-
 50 files changed, 1882 insertions(+), 1372 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a7bc5de9/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index 1f32a89..51255ab 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -27,7 +27,7 @@ import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.typeutils.CompatibilityResult;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.TypeSerializerSerializationProxy;
+import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer;
 import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.runtime.DataInputViewStream;
@@ -53,6 +53,7 @@ import org.apache.flink.runtime.io.async.AsyncStoppableTaskWithCallback;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
 import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
 import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.api.common.typeutils.CompatibilityUtil;
 import org.apache.flink.runtime.state.DoneFuture;
 import org.apache.flink.runtime.state.IncrementalKeyedStateHandle;
 import org.apache.flink.runtime.state.KeyGroupRange;
@@ -64,7 +65,6 @@ import org.apache.flink.runtime.state.KeyedStateHandle;
 import org.apache.flink.runtime.state.PlaceholderStreamStateHandle;
 import org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo;
 import org.apache.flink.runtime.state.StateHandleID;
-import org.apache.flink.runtime.state.StateMigrationUtil;
 import org.apache.flink.runtime.state.StateObject;
 import org.apache.flink.runtime.state.StateUtil;
 import org.apache.flink.runtime.state.StreamStateHandle;
@@ -1111,9 +1111,9 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 
 			// check for key serializer compatibility; this also reconfigures the
 			// key serializer to be compatible, if it is required and is possible
-			if (StateMigrationUtil.resolveCompatibilityResult(
+			if (CompatibilityUtil.resolveCompatibilityResult(
 					serializationProxy.getKeySerializer(),
-					TypeSerializerSerializationProxy.ClassNotFoundDummyTypeSerializer.class,
+					UnloadableDummyTypeSerializer.class,
 					serializationProxy.getKeySerializerConfigSnapshot(),
 					rocksDBKeyedStateBackend.keySerializer)
 				.isRequiresMigration()) {
@@ -1230,9 +1230,9 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 
 				// check for key serializer compatibility; this also reconfigures the
 				// key serializer to be compatible, if it is required and is possible
-				if (StateMigrationUtil.resolveCompatibilityResult(
+				if (CompatibilityUtil.resolveCompatibilityResult(
 						serializationProxy.getKeySerializer(),
-						TypeSerializerSerializationProxy.ClassNotFoundDummyTypeSerializer.class,
+						UnloadableDummyTypeSerializer.class,
 						serializationProxy.getKeySerializerConfigSnapshot(),
 						stateBackend.keySerializer)
 					.isRequiresMigration()) {
@@ -1532,16 +1532,15 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 			}
 
 			// check compatibility results to determine if state migration is required
-
-			CompatibilityResult<?> namespaceCompatibility = StateMigrationUtil.resolveCompatibilityResult(
+			CompatibilityResult<N> namespaceCompatibility = CompatibilityUtil.resolveCompatibilityResult(
 					restoredMetaInfo.getNamespaceSerializer(),
 					MigrationNamespaceSerializerProxy.class,
 					restoredMetaInfo.getNamespaceSerializerConfigSnapshot(),
 					newMetaInfo.getNamespaceSerializer());
 
-			CompatibilityResult<S> stateCompatibility = StateMigrationUtil.resolveCompatibilityResult(
+			CompatibilityResult<S> stateCompatibility = CompatibilityUtil.resolveCompatibilityResult(
 					restoredMetaInfo.getStateSerializer(),
-					TypeSerializerSerializationProxy.ClassNotFoundDummyTypeSerializer.class,
+					UnloadableDummyTypeSerializer.class,
 					restoredMetaInfo.getStateSerializerConfigSnapshot(),
 					newMetaInfo.getStateSerializer());
 

http://git-wip-us.apache.org/repos/asf/flink/blob/a7bc5de9/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompatibilityUtil.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompatibilityUtil.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompatibilityUtil.java
new file mode 100644
index 0000000..8959628
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompatibilityUtil.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils;
+
+import org.apache.flink.annotation.Internal;
+
+/**
+ * Utilities related to serializer compatibility.
+ */
+@Internal
+public class CompatibilityUtil {
+
+	/**
+	 * Resolves the final compatibility result of two serializers by taking into account compound information,
+	 * including the preceding serializer, the preceding serializer's configuration snapshot, and the new serializer.
+	 *
+	 * The final result is determined as follows:
+	 *   1. If there is no configuration snapshot of the preceding serializer,
+	 *      assumes the new serializer to be compatible.
+	 *   2. Confront the configuration snapshot with the new serializer.
+	 *   3. If the result is compatible, just return that as the result.
+	 *   4. If not compatible and requires migration, check if the preceding serializer is valid.
+	 *      If yes, use that as the convert deserializer for state migration.
+	 *   5. If the preceding serializer is not valid, check if the result came with a convert deserializer.
+	 *      If yes, use that for state migration and simply return the result.
+	 *   6. If all of above fails, state migration is required but could not be performed; throw exception.
+	 *
+	 * @param precedingSerializer the preceding serializer used to write the data
+	 * @param dummySerializerClassTag any class tags that identifies the preceding serializer as a dummy placeholder
+	 * @param precedingSerializerConfigSnapshot configuration snapshot of the preceding serializer
+	 * @param newSerializer the new serializer to ensure compatibility with
+	 *
+	 * @param <T> Type of the data handled by the serializers
+	 * 
+	 * @return the final resolved compatibility result
+	 */
+	@SuppressWarnings("unchecked")
+	public static <T> CompatibilityResult<T> resolveCompatibilityResult(
+			TypeSerializer<?> precedingSerializer,
+			Class<?> dummySerializerClassTag,
+			TypeSerializerConfigSnapshot precedingSerializerConfigSnapshot,
+			TypeSerializer<T> newSerializer) {
+
+		if (precedingSerializerConfigSnapshot != null) {
+			CompatibilityResult<T> initialResult = newSerializer.ensureCompatibility(precedingSerializerConfigSnapshot);
+
+			if (!initialResult.isRequiresMigration()) {
+				return initialResult;
+			} else {
+				if (precedingSerializer != null && !(precedingSerializer.getClass().equals(dummySerializerClassTag))) {
+					// if the preceding serializer exists and is not a dummy, use
+					// that for converting instead of the provided convert deserializer
+					return CompatibilityResult.requiresMigration((TypeSerializer<T>) precedingSerializer);
+				} else if (initialResult.getConvertDeserializer() != null) {
+					return initialResult;
+				} else {
+					throw new RuntimeException(
+						"State migration required, but there is no available serializer capable of reading previous data.");
+				}
+			}
+		} else {
+			// if the configuration snapshot of the preceding serializer cannot be provided,
+			// we can only simply assume that the new serializer is compatible
+			return CompatibilityResult.compatible();
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a7bc5de9/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerConfigSnapshot.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerConfigSnapshot.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerConfigSnapshot.java
index e7e2650..45b78c1 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerConfigSnapshot.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerConfigSnapshot.java
@@ -19,47 +19,64 @@
 package org.apache.flink.api.common.typeutils;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.util.Preconditions;
 
 import java.io.IOException;
-import java.util.Arrays;
+import java.util.ArrayList;
+import java.util.List;
 
 /**
  * A {@link TypeSerializerConfigSnapshot} for serializers that has multiple nested serializers.
- * The configuration snapshot consists of the configuration snapshots of all nested serializers.
+ * The configuration snapshot consists of the configuration snapshots of all nested serializers, and
+ * also the nested serializers themselves.
+ *
+ * <p>Both the nested serializers and the configuration snapshots are written as configuration of
+ * composite serializers, so that on restore, the previous serializer may be used in case migration
+ * is required.
  */
 @Internal
 public abstract class CompositeTypeSerializerConfigSnapshot extends TypeSerializerConfigSnapshot {
 
-	private TypeSerializerConfigSnapshot[] nestedSerializerConfigSnapshots;
+	private List<Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> nestedSerializersAndConfigs;
 
 	/** This empty nullary constructor is required for deserializing the configuration. */
 	public CompositeTypeSerializerConfigSnapshot() {}
 
-	public CompositeTypeSerializerConfigSnapshot(TypeSerializerConfigSnapshot... nestedSerializerConfigSnapshots) {
-		this.nestedSerializerConfigSnapshots = Preconditions.checkNotNull(nestedSerializerConfigSnapshots);
+	public CompositeTypeSerializerConfigSnapshot(TypeSerializer<?>... nestedSerializers) {
+		Preconditions.checkNotNull(nestedSerializers);
+
+		this.nestedSerializersAndConfigs = new ArrayList<>(nestedSerializers.length);
+		for (TypeSerializer<?> nestedSerializer : nestedSerializers) {
+			TypeSerializerConfigSnapshot configSnapshot = nestedSerializer.snapshotConfiguration();
+			this.nestedSerializersAndConfigs.add(
+				new Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>(
+					nestedSerializer.duplicate(),
+					Preconditions.checkNotNull(configSnapshot)));
+		}
 	}
 
 	@Override
 	public void write(DataOutputView out) throws IOException {
 		super.write(out);
-		TypeSerializerUtil.writeSerializerConfigSnapshots(out, nestedSerializerConfigSnapshots);
+		TypeSerializerSerializationUtil.writeSerializersAndConfigsWithResilience(out, nestedSerializersAndConfigs);
 	}
 
 	@Override
 	public void read(DataInputView in) throws IOException {
 		super.read(in);
-		nestedSerializerConfigSnapshots = TypeSerializerUtil.readSerializerConfigSnapshots(in, getUserCodeClassLoader());
+		this.nestedSerializersAndConfigs =
+			TypeSerializerSerializationUtil.readSerializersAndConfigsWithResilience(in, getUserCodeClassLoader());
 	}
 
-	public TypeSerializerConfigSnapshot[] getNestedSerializerConfigSnapshots() {
-		return nestedSerializerConfigSnapshots;
+	public List<Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> getNestedSerializersAndConfigs() {
+		return nestedSerializersAndConfigs;
 	}
 
-	public TypeSerializerConfigSnapshot getSingleNestedSerializerConfigSnapshot() {
-		return nestedSerializerConfigSnapshots[0];
+	public Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot> getSingleNestedSerializerAndConfig() {
+		return nestedSerializersAndConfigs.get(0);
 	}
 
 	@Override
@@ -73,13 +90,11 @@ public abstract class CompositeTypeSerializerConfigSnapshot extends TypeSerializ
 		}
 
 		return (obj.getClass().equals(getClass()))
-				&& Arrays.equals(
-					nestedSerializerConfigSnapshots,
-					((CompositeTypeSerializerConfigSnapshot) obj).getNestedSerializerConfigSnapshots());
+				&& nestedSerializersAndConfigs.equals(((CompositeTypeSerializerConfigSnapshot) obj).getNestedSerializersAndConfigs());
 	}
 
 	@Override
 	public int hashCode() {
-		return Arrays.hashCode(nestedSerializerConfigSnapshots);
+		return nestedSerializersAndConfigs.hashCode();
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a7bc5de9/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationProxy.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationProxy.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationProxy.java
deleted file mode 100644
index 067a1ca..0000000
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationProxy.java
+++ /dev/null
@@ -1,244 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.common.typeutils;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.core.io.VersionedIOReadableWritable;
-import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.util.InstantiationUtil;
-import org.apache.flink.util.Preconditions;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.io.InvalidClassException;
-import java.util.Arrays;
-
-@Internal
-public class TypeSerializerSerializationProxy<T> extends VersionedIOReadableWritable {
-
-	public static final int VERSION = 1;
-	private static final Logger LOG = LoggerFactory.getLogger(TypeSerializerSerializationProxy.class);
-
-	private ClassLoader userClassLoader;
-
-	private TypeSerializer<T> typeSerializer;
-
-	private boolean ignoreClassNotFound;
-
-	public TypeSerializerSerializationProxy(ClassLoader userClassLoader, boolean ignoreClassNotFound) {
-		this.userClassLoader = userClassLoader;
-		this.ignoreClassNotFound = ignoreClassNotFound;
-	}
-
-	public TypeSerializerSerializationProxy(ClassLoader userClassLoader) {
-		this(userClassLoader, false);
-	}
-
-	public TypeSerializerSerializationProxy(TypeSerializer<T> typeSerializer) {
-		this.typeSerializer = Preconditions.checkNotNull(typeSerializer);
-		this.ignoreClassNotFound = false;
-	}
-
-	public TypeSerializer<T> getTypeSerializer() {
-		return typeSerializer;
-	}
-
-	@Override
-	public int getVersion() {
-		return VERSION;
-	}
-
-	@Override
-	public void write(DataOutputView out) throws IOException {
-		super.write(out);
-
-		if (typeSerializer instanceof ClassNotFoundDummyTypeSerializer) {
-			ClassNotFoundDummyTypeSerializer<T> dummyTypeSerializer =
-					(ClassNotFoundDummyTypeSerializer<T>) this.typeSerializer;
-
-			byte[] serializerBytes = dummyTypeSerializer.getActualBytes();
-			out.write(serializerBytes.length);
-			out.write(serializerBytes);
-		} else {
-			// write in a way that allows the stream to recover from exceptions
-			try (ByteArrayOutputStreamWithPos streamWithPos = new ByteArrayOutputStreamWithPos()) {
-				InstantiationUtil.serializeObject(streamWithPos, typeSerializer);
-				out.writeInt(streamWithPos.getPosition());
-				out.write(streamWithPos.getBuf(), 0, streamWithPos.getPosition());
-			}
-		}
-	}
-
-	@Override
-	public void read(DataInputView in) throws IOException {
-		super.read(in);
-
-		// read in a way that allows the stream to recover from exceptions
-		int serializerBytes = in.readInt();
-		byte[] buffer = new byte[serializerBytes];
-		in.readFully(buffer);
-		try {
-			typeSerializer = InstantiationUtil.deserializeObject(buffer, userClassLoader);
-		} catch (ClassNotFoundException | InvalidClassException e) {
-			if (ignoreClassNotFound) {
-				// we create a dummy so that all the information is not lost when we get a new checkpoint before receiving
-				// a proper typeserializer from the user
-				typeSerializer =
-						new ClassNotFoundDummyTypeSerializer<>(buffer);
-				LOG.warn("Could not find requested TypeSerializer class in classpath. Created dummy.", e);
-			} else {
-				throw new IOException("Missing class for type serializer.", e);
-			}
-		}
-	}
-
-	@Override
-	public boolean equals(Object o) {
-		if (this == o) {
-			return true;
-		}
-
-		if (o == null || getClass() != o.getClass()) {
-			return false;
-		}
-
-		TypeSerializerSerializationProxy<?> that = (TypeSerializerSerializationProxy<?>) o;
-
-		return getTypeSerializer() != null ? getTypeSerializer().equals(that.getTypeSerializer()) : that.getTypeSerializer() == null;
-	}
-
-	@Override
-	public int hashCode() {
-		return getTypeSerializer() != null ? getTypeSerializer().hashCode() : 0;
-	}
-
-	public boolean isIgnoreClassNotFound() {
-		return ignoreClassNotFound;
-	}
-
-	public void setIgnoreClassNotFound(boolean ignoreClassNotFound) {
-		this.ignoreClassNotFound = ignoreClassNotFound;
-	}
-
-	/**
-	 * Dummy TypeSerializer to avoid that data is lost when checkpointing again a serializer for which we encountered
-	 * a {@link ClassNotFoundException}.
-	 */
-	public static final class ClassNotFoundDummyTypeSerializer<T> extends TypeSerializer<T> {
-
-		private static final long serialVersionUID = 2526330533671642711L;
-		private final byte[] actualBytes;
-
-		public ClassNotFoundDummyTypeSerializer(byte[] actualBytes) {
-			this.actualBytes = Preconditions.checkNotNull(actualBytes);
-		}
-
-		public byte[] getActualBytes() {
-			return actualBytes;
-		}
-
-		@Override
-		public boolean isImmutableType() {
-			throw new UnsupportedOperationException("This object is a dummy TypeSerializer.");
-		}
-
-		@Override
-		public TypeSerializer<T> duplicate() {
-			throw new UnsupportedOperationException("This object is a dummy TypeSerializer.");
-		}
-
-		@Override
-		public T createInstance() {
-			throw new UnsupportedOperationException("This object is a dummy TypeSerializer.");
-		}
-
-		@Override
-		public T copy(T from) {
-			throw new UnsupportedOperationException("This object is a dummy TypeSerializer.");
-		}
-
-		@Override
-		public T copy(T from, T reuse) {
-			throw new UnsupportedOperationException("This object is a dummy TypeSerializer.");
-		}
-
-		@Override
-		public int getLength() {
-			throw new UnsupportedOperationException("This object is a dummy TypeSerializer.");
-		}
-
-		@Override
-		public void serialize(T record, DataOutputView target) throws IOException {
-			throw new UnsupportedOperationException("This object is a dummy TypeSerializer.");
-		}
-
-		@Override
-		public T deserialize(DataInputView source) throws IOException {
-			throw new UnsupportedOperationException("This object is a dummy TypeSerializer.");
-		}
-
-		@Override
-		public T deserialize(T reuse, DataInputView source) throws IOException {
-			throw new UnsupportedOperationException("This object is a dummy TypeSerializer.");
-		}
-
-		@Override
-		public void copy(DataInputView source, DataOutputView target) throws IOException {
-			throw new UnsupportedOperationException("This object is a dummy TypeSerializer.");
-		}
-
-		@Override
-		public TypeSerializerConfigSnapshot snapshotConfiguration() {
-			throw new UnsupportedOperationException("This object is a dummy TypeSerializer.");
-		}
-
-		@Override
-		public CompatibilityResult<T> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
-			throw new UnsupportedOperationException("This object is a dummy TypeSerializer.");
-		}
-
-		@Override
-		public boolean canEqual(Object obj) {
-			return false;
-		}
-
-		@Override
-		public boolean equals(Object o) {
-			if (this == o) {
-				return true;
-			}
-
-			if (o == null || getClass() != o.getClass()) {
-				return false;
-			}
-
-			ClassNotFoundDummyTypeSerializer<?> that = (ClassNotFoundDummyTypeSerializer<?>) o;
-
-			return Arrays.equals(getActualBytes(), that.getActualBytes());
-		}
-
-		@Override
-		public int hashCode() {
-			return Arrays.hashCode(getActualBytes());
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a7bc5de9/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationUtil.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationUtil.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationUtil.java
new file mode 100644
index 0000000..3d79d9a
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationUtil.java
@@ -0,0 +1,446 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.io.VersionedIOReadableWritable;
+import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
+import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.InvalidClassException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Utility methods for serialization of {@link TypeSerializer} and {@link TypeSerializerConfigSnapshot}.
+ */
+@Internal
+public class TypeSerializerSerializationUtil {
+
+	private static final Logger LOG = LoggerFactory.getLogger(TypeSerializerSerializationUtil.class);
+
+	/**
+	 * Writes a {@link TypeSerializer} to the provided data output view.
+	 *
+	 * <p>It is written with a format that can be later read again using
+	 * {@link #tryReadSerializer(DataInputView, ClassLoader, boolean)}.
+	 *
+	 * @param out the data output view.
+	 * @param serializer the serializer to write.
+	 *
+	 * @param <T> Data type of the serializer.
+	 *
+	 * @throws IOException
+	 */
+	public static <T> void writeSerializer(DataOutputView out, TypeSerializer<T> serializer) throws IOException {
+		new TypeSerializerSerializationUtil.TypeSerializerSerializationProxy<>(serializer).write(out);
+	}
+
+	/**
+	 * Reads from a data input view a {@link TypeSerializer} that was previously
+	 * written using {@link #writeSerializer(DataOutputView, TypeSerializer)}.
+	 *
+	 * <p>If deserialization fails for any reason (corrupted serializer bytes, serializer class
+	 * no longer in classpath, serializer class no longer valid, etc.), {@code null} will
+	 * be returned instead.
+	 *
+	 * @param in the data input view.
+	 * @param userCodeClassLoader the user code class loader to use.
+	 *
+	 * @param <T> Data type of the serializer.
+	 *
+	 * @return the deserialized serializer.
+	 */
+	public static <T> TypeSerializer<T> tryReadSerializer(DataInputView in, ClassLoader userCodeClassLoader) {
+		return tryReadSerializer(in, userCodeClassLoader, false);
+	}
+
+	/**
+	 * Reads from a data input view a {@link TypeSerializer} that was previously
+	 * written using {@link #writeSerializer(DataOutputView, TypeSerializer)}.
+	 *
+	 * <p>If deserialization fails due to {@link ClassNotFoundException} or {@link InvalidClassException},
+	 * users can opt to use a dummy {@link UnloadableDummyTypeSerializer} to hold the serializer bytes,
+	 * otherwise {@code null} is returned. If the failure is due to a {@link java.io.StreamCorruptedException},
+	 * then {@code null} is returned.
+	 *
+	 * @param in the data input view.
+	 * @param userCodeClassLoader the user code class loader to use.
+	 * @param useDummyPlaceholder whether or not to use a dummy {@link UnloadableDummyTypeSerializer} to hold the
+	 *                            serializer bytes in the case of a {@link ClassNotFoundException} or
+	 *                            {@link InvalidClassException}.
+	 *
+	 * @param <T> Data type of the serializer.
+	 *
+	 * @return the deserialized serializer.
+	 */
+	public static <T> TypeSerializer<T> tryReadSerializer(DataInputView in, ClassLoader userCodeClassLoader, boolean useDummyPlaceholder) {
+		final TypeSerializerSerializationUtil.TypeSerializerSerializationProxy<T> proxy =
+			new TypeSerializerSerializationUtil.TypeSerializerSerializationProxy<>(userCodeClassLoader, useDummyPlaceholder);
+
+		try {
+			proxy.read(in);
+			return proxy.getTypeSerializer();
+		} catch (IOException e) {
+			LOG.warn("Deserialization of serializer errored; replacing with null.", e);
+
+			return null;
+		}
+	}
+
+	/**
+	 * Write a list of serializers and their corresponding config snapshots to the provided
+	 * data output view. This method writes in a fault tolerant way, so that when read again
+	 * using {@link #readSerializersAndConfigsWithResilience(DataInputView, ClassLoader)}, if
+	 * deserialization of the serializer fails, its configuration snapshot will remain intact.
+	 *
+	 * <p>Specifically, all written serializers and their config snapshots are indexed by their
+	 * offset positions within the serialized bytes. The serialization format is as follows:
+	 * <ul>
+	 *     <li>1. number of serializer and configuration snapshot pairs.</li>
+	 *     <li>2. offsets of each serializer and configuration snapshot, in order.</li>
+	 *     <li>3. total number of bytes for the serialized serializers and the config snapshots.</li>
+	 *     <li>4. serialized serializers and the config snapshots.</li>
+	 * </ul>
+	 *
+	 * @param out the data output view.
+	 * @param serializersAndConfigs serializer and configuration snapshot pairs
+	 *
+	 * @throws IOException
+	 */
+	public static void writeSerializersAndConfigsWithResilience(
+			DataOutputView out,
+			List<Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> serializersAndConfigs) throws IOException {
+
+		try (
+			ByteArrayOutputStreamWithPos bufferWithPos = new ByteArrayOutputStreamWithPos();
+			DataOutputViewStreamWrapper bufferWrapper = new DataOutputViewStreamWrapper(bufferWithPos)) {
+
+			out.writeInt(serializersAndConfigs.size());
+			for (Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot> serAndConfSnapshot : serializersAndConfigs) {
+				out.writeInt(bufferWithPos.getPosition());
+				writeSerializer(bufferWrapper, serAndConfSnapshot.f0);
+
+				out.writeInt(bufferWithPos.getPosition());
+				writeSerializerConfigSnapshot(bufferWrapper, serAndConfSnapshot.f1);
+			}
+
+			out.writeInt(bufferWithPos.getPosition());
+			out.write(bufferWithPos.getBuf(), 0, bufferWithPos.getPosition());
+		}
+	}
+
+	/**
+	 * Reads from a data input view a list of serializers and their corresponding config snapshots
+	 * written using {@link #writeSerializersAndConfigsWithResilience(DataOutputView, List)}.
+	 * This is fault tolerant to any failures when deserializing the serializers. Serializers which
+	 * were not successfully deserialized will be replaced by {@code null}.
+	 *
+	 * @param in the data input view.
+	 * @param userCodeClassLoader the user code class loader to use.
+	 *
+	 * @return the deserialized serializer and config snapshot pairs.
+	 *
+	 * @throws IOException
+	 */
+	public static List<Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> readSerializersAndConfigsWithResilience(
+			DataInputView in,
+			ClassLoader userCodeClassLoader) throws IOException {
+
+		int numSerializersAndConfigSnapshots = in.readInt();
+
+		int[] offsets = new int[numSerializersAndConfigSnapshots * 2];
+
+		for (int i = 0; i < numSerializersAndConfigSnapshots; i++) {
+			offsets[i * 2] = in.readInt();
+			offsets[i * 2 + 1] = in.readInt();
+		}
+
+		int totalBytes = in.readInt();
+		byte[] buffer = new byte[totalBytes];
+		in.readFully(buffer);
+
+		List<Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> serializersAndConfigSnapshots =
+			new ArrayList<>(numSerializersAndConfigSnapshots);
+
+		TypeSerializer<?> serializer;
+		TypeSerializerConfigSnapshot configSnapshot;
+		try (
+			ByteArrayInputStreamWithPos bufferWithPos = new ByteArrayInputStreamWithPos(buffer);
+			DataInputViewStreamWrapper bufferWrapper = new DataInputViewStreamWrapper(bufferWithPos)) {
+
+			for (int i = 0; i < numSerializersAndConfigSnapshots; i++) {
+
+				bufferWithPos.setPosition(offsets[i * 2]);
+				serializer = tryReadSerializer(bufferWrapper, userCodeClassLoader);
+
+				bufferWithPos.setPosition(offsets[i * 2 + 1]);
+				configSnapshot = readSerializerConfigSnapshot(bufferWrapper, userCodeClassLoader);
+
+				serializersAndConfigSnapshots.add(
+					new Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>(serializer, configSnapshot));
+			}
+		}
+
+		return serializersAndConfigSnapshots;
+	}
+
+	/**
+	 * Writes a {@link TypeSerializerConfigSnapshot} to the provided data output view.
+	 *
+	 * <p>It is written with a format that can be later read again using
+	 * {@link #readSerializerConfigSnapshot(DataInputView, ClassLoader)}.
+	 *
+	 * @param out the data output view
+	 * @param serializerConfigSnapshot the serializer configuration snapshot to write
+	 *
+	 * @throws IOException
+	 */
+	public static void writeSerializerConfigSnapshot(
+			DataOutputView out,
+			TypeSerializerConfigSnapshot serializerConfigSnapshot) throws IOException {
+
+		new TypeSerializerConfigSnapshotSerializationProxy(serializerConfigSnapshot).write(out);
+	}
+
+	/**
+	 * Reads from a data input view a {@link TypeSerializerConfigSnapshot} that was previously
+	 * written using {@link #writeSerializerConfigSnapshot(DataOutputView, TypeSerializerConfigSnapshot)}.
+	 *
+	 * @param in the data input view
+	 * @param userCodeClassLoader the user code class loader to use
+	 *
+	 * @return the read serializer configuration snapshot
+	 *
+	 * @throws IOException
+	 */
+	public static TypeSerializerConfigSnapshot readSerializerConfigSnapshot(
+			DataInputView in,
+			ClassLoader userCodeClassLoader) throws IOException {
+
+		final TypeSerializerConfigSnapshotSerializationProxy proxy = new TypeSerializerConfigSnapshotSerializationProxy(userCodeClassLoader);
+		proxy.read(in);
+
+		return proxy.getSerializerConfigSnapshot();
+	}
+
+	/**
+	 * Writes multiple {@link TypeSerializerConfigSnapshot}s to the provided data output view.
+	 *
+	 * <p>It is written with a format that can be later read again using
+	 * {@link #readSerializerConfigSnapshots(DataInputView, ClassLoader)}.
+	 *
+	 * @param out the data output view
+	 * @param serializerConfigSnapshots the serializer configuration snapshots to write
+	 *
+	 * @throws IOException
+	 */
+	public static void writeSerializerConfigSnapshots(
+			DataOutputView out,
+			TypeSerializerConfigSnapshot... serializerConfigSnapshots) throws IOException {
+
+		out.writeInt(serializerConfigSnapshots.length);
+
+		for (TypeSerializerConfigSnapshot snapshot : serializerConfigSnapshots) {
+			new TypeSerializerConfigSnapshotSerializationProxy(snapshot).write(out);
+		}
+	}
+
+	/**
+	 * Reads from a data input view multiple {@link TypeSerializerConfigSnapshot}s that was previously
+	 * written using {@link #writeSerializerConfigSnapshot(DataOutputView, TypeSerializerConfigSnapshot)}.
+	 *
+	 * @param in the data input view
+	 * @param userCodeClassLoader the user code class loader to use
+	 *
+	 * @return the read serializer configuration snapshots
+	 *
+	 * @throws IOException
+	 */
+	public static TypeSerializerConfigSnapshot[] readSerializerConfigSnapshots(
+			DataInputView in,
+			ClassLoader userCodeClassLoader) throws IOException {
+
+		int numFields = in.readInt();
+		final TypeSerializerConfigSnapshot[] serializerConfigSnapshots = new TypeSerializerConfigSnapshot[numFields];
+
+		TypeSerializerConfigSnapshotSerializationProxy proxy;
+		for (int i = 0; i < numFields; i++) {
+			proxy = new TypeSerializerConfigSnapshotSerializationProxy(userCodeClassLoader);
+			proxy.read(in);
+			serializerConfigSnapshots[i] = proxy.getSerializerConfigSnapshot();
+		}
+
+		return serializerConfigSnapshots;
+	}
+
+	// -----------------------------------------------------------------------------------------------------
+
+	/**
+	 * Utility serialization proxy for a {@link TypeSerializer}.
+	 */
+	public static final class TypeSerializerSerializationProxy<T> extends VersionedIOReadableWritable {
+
+		private static final Logger LOG = LoggerFactory.getLogger(TypeSerializerSerializationProxy.class);
+
+		private static final int VERSION = 1;
+
+		private ClassLoader userClassLoader;
+		private TypeSerializer<T> typeSerializer;
+		private boolean useDummyPlaceholder;
+
+		public TypeSerializerSerializationProxy(ClassLoader userClassLoader, boolean useDummyPlaceholder) {
+			this.userClassLoader = userClassLoader;
+			this.useDummyPlaceholder = useDummyPlaceholder;
+		}
+
+		public TypeSerializerSerializationProxy(ClassLoader userClassLoader) {
+			this(userClassLoader, false);
+		}
+
+		public TypeSerializerSerializationProxy(TypeSerializer<T> typeSerializer) {
+			this.typeSerializer = Preconditions.checkNotNull(typeSerializer);
+			this.useDummyPlaceholder = false;
+		}
+
+		public TypeSerializer<T> getTypeSerializer() {
+			return typeSerializer;
+		}
+
+		@Override
+		public void write(DataOutputView out) throws IOException {
+			super.write(out);
+
+			if (typeSerializer instanceof UnloadableDummyTypeSerializer) {
+				UnloadableDummyTypeSerializer<T> dummyTypeSerializer =
+					(UnloadableDummyTypeSerializer<T>) this.typeSerializer;
+
+				byte[] serializerBytes = dummyTypeSerializer.getActualBytes();
+				out.write(serializerBytes.length);
+				out.write(serializerBytes);
+			} else {
+				// write in a way that allows the stream to recover from exceptions
+				try (ByteArrayOutputStreamWithPos streamWithPos = new ByteArrayOutputStreamWithPos()) {
+					InstantiationUtil.serializeObject(streamWithPos, typeSerializer);
+					out.writeInt(streamWithPos.getPosition());
+					out.write(streamWithPos.getBuf(), 0, streamWithPos.getPosition());
+				}
+			}
+		}
+
+		@Override
+		public void read(DataInputView in) throws IOException {
+			super.read(in);
+
+			// read in a way that allows the stream to recover from exceptions
+			int serializerBytes = in.readInt();
+			byte[] buffer = new byte[serializerBytes];
+			in.readFully(buffer);
+			try {
+				typeSerializer = InstantiationUtil.deserializeObject(buffer, userClassLoader);
+			} catch (ClassNotFoundException | InvalidClassException e) {
+				if (useDummyPlaceholder) {
+					// we create a dummy so that all the information is not lost when we get a new checkpoint before receiving
+					// a proper typeserializer from the user
+					typeSerializer =
+						new UnloadableDummyTypeSerializer<>(buffer);
+					LOG.warn("Could not find requested TypeSerializer class in classpath. Created dummy.", e);
+				} else {
+					throw new IOException("Missing class for type serializer.", e);
+				}
+			}
+		}
+
+		@Override
+		public int getVersion() {
+			return VERSION;
+		}
+	}
+
+	/**
+	 * Utility serialization proxy for a {@link TypeSerializerConfigSnapshot}.
+	 */
+	static final class TypeSerializerConfigSnapshotSerializationProxy extends VersionedIOReadableWritable {
+
+		private static final int VERSION = 1;
+
+		private ClassLoader userCodeClassLoader;
+		private TypeSerializerConfigSnapshot serializerConfigSnapshot;
+
+		TypeSerializerConfigSnapshotSerializationProxy(ClassLoader userCodeClassLoader) {
+			this.userCodeClassLoader = Preconditions.checkNotNull(userCodeClassLoader);
+		}
+
+		TypeSerializerConfigSnapshotSerializationProxy(TypeSerializerConfigSnapshot serializerConfigSnapshot) {
+			this.serializerConfigSnapshot = serializerConfigSnapshot;
+		}
+
+		@Override
+		public void write(DataOutputView out) throws IOException {
+			super.write(out);
+
+			// config snapshot class, so that we can re-instantiate the
+			// correct type of config snapshot instance when deserializing
+			out.writeUTF(serializerConfigSnapshot.getClass().getName());
+
+			// the actual configuration parameters
+			serializerConfigSnapshot.write(out);
+		}
+
+		@SuppressWarnings("unchecked")
+		@Override
+		public void read(DataInputView in) throws IOException {
+			super.read(in);
+
+			String serializerConfigClassname = in.readUTF();
+			Class<? extends TypeSerializerConfigSnapshot> serializerConfigSnapshotClass;
+			try {
+				serializerConfigSnapshotClass = (Class<? extends TypeSerializerConfigSnapshot>)
+					Class.forName(serializerConfigClassname, true, userCodeClassLoader);
+			} catch (ClassNotFoundException e) {
+				throw new IOException(
+					"Could not find requested TypeSerializerConfigSnapshot class "
+						+ serializerConfigClassname +  " in classpath.", e);
+			}
+
+			serializerConfigSnapshot = InstantiationUtil.instantiate(serializerConfigSnapshotClass);
+			serializerConfigSnapshot.setUserCodeClassLoader(userCodeClassLoader);
+			serializerConfigSnapshot.read(in);
+		}
+
+		@Override
+		public int getVersion() {
+			return VERSION;
+		}
+
+		TypeSerializerConfigSnapshot getSerializerConfigSnapshot() {
+			return serializerConfigSnapshot;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a7bc5de9/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerUtil.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerUtil.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerUtil.java
deleted file mode 100644
index 0a2148a..0000000
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerUtil.java
+++ /dev/null
@@ -1,203 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.common.typeutils;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.core.io.VersionedIOReadableWritable;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.util.InstantiationUtil;
-import org.apache.flink.util.Preconditions;
-
-import java.io.IOException;
-
-/**
- * Utility methods for {@link TypeSerializer} and {@link TypeSerializerConfigSnapshot}.
- */
-@Internal
-public class TypeSerializerUtil {
-
-	/**
-	 * Creates an array of {@link TypeSerializerConfigSnapshot}s taken
-	 * from the provided array of {@link TypeSerializer}s.
-	 *
-	 * @param serializers array of type serializers.
-	 *
-	 * @return array of configuration snapshots taken from each serializer.
-	 */
-	public static TypeSerializerConfigSnapshot[] snapshotConfigurations(TypeSerializer<?>[] serializers) {
-		final TypeSerializerConfigSnapshot[] configSnapshots = new TypeSerializerConfigSnapshot[serializers.length];
-
-		for (int i = 0; i < serializers.length; i++) {
-			configSnapshots[i] = serializers[i].snapshotConfiguration();
-		}
-
-		return configSnapshots;
-	}
-
-	/**
-	 * Writes a {@link TypeSerializerConfigSnapshot} to the provided data output view.
-	 *
-	 * <p>It is written with a format that can be later read again using
-	 * {@link #readSerializerConfigSnapshot(DataInputView, ClassLoader)}.
-	 *
-	 * @param out the data output view
-	 * @param serializerConfigSnapshot the serializer configuration snapshot to write
-	 *
-	 * @throws IOException
-	 */
-	public static void writeSerializerConfigSnapshot(
-			DataOutputView out,
-			TypeSerializerConfigSnapshot serializerConfigSnapshot) throws IOException {
-
-		new TypeSerializerConfigSnapshotProxy(serializerConfigSnapshot).write(out);
-	}
-
-	/**
-	 * Reads from a data input view a {@link TypeSerializerConfigSnapshot} that was previously
-	 * written using {@link #writeSerializerConfigSnapshot(DataOutputView, TypeSerializerConfigSnapshot)}.
-	 *
-	 * @param in the data input view
-	 * @param userCodeClassLoader the user code class loader to use
-	 *
-	 * @return the read serializer configuration snapshot
-	 *
-	 * @throws IOException
-	 */
-	public static TypeSerializerConfigSnapshot readSerializerConfigSnapshot(
-			DataInputView in,
-			ClassLoader userCodeClassLoader) throws IOException {
-
-		final TypeSerializerConfigSnapshotProxy proxy = new TypeSerializerConfigSnapshotProxy(userCodeClassLoader);
-		proxy.read(in);
-
-		return proxy.getSerializerConfigSnapshot();
-	}
-
-	/**
-	 * Writes multiple {@link TypeSerializerConfigSnapshot}s to the provided data output view.
-	 *
-	 * <p>It is written with a format that can be later read again using
-	 * {@link #readSerializerConfigSnapshots(DataInputView, ClassLoader)}.
-	 *
-	 * @param out the data output view
-	 * @param serializerConfigSnapshots the serializer configuration snapshots to write
-	 *
-	 * @throws IOException
-	 */
-	public static void writeSerializerConfigSnapshots(
-			DataOutputView out,
-			TypeSerializerConfigSnapshot... serializerConfigSnapshots) throws IOException {
-
-		out.writeInt(serializerConfigSnapshots.length);
-
-		for (TypeSerializerConfigSnapshot snapshot : serializerConfigSnapshots) {
-			new TypeSerializerConfigSnapshotProxy(snapshot).write(out);
-		}
-	}
-
-	/**
-	 * Reads from a data input view multiple {@link TypeSerializerConfigSnapshot}s that was previously
-	 * written using {@link #writeSerializerConfigSnapshot(DataOutputView, TypeSerializerConfigSnapshot)}.
-	 *
-	 * @param in the data input view
-	 * @param userCodeClassLoader the user code class loader to use
-	 *
-	 * @return the read serializer configuration snapshots
-	 *
-	 * @throws IOException
-	 */
-	public static TypeSerializerConfigSnapshot[] readSerializerConfigSnapshots(
-			DataInputView in,
-			ClassLoader userCodeClassLoader) throws IOException {
-
-		int numFields = in.readInt();
-		final TypeSerializerConfigSnapshot[] serializerConfigSnapshots = new TypeSerializerConfigSnapshot[numFields];
-
-		TypeSerializerConfigSnapshotProxy proxy;
-		for (int i = 0; i < numFields; i++) {
-			proxy = new TypeSerializerConfigSnapshotProxy(userCodeClassLoader);
-			proxy.read(in);
-			serializerConfigSnapshots[i] = proxy.getSerializerConfigSnapshot();
-		}
-
-		return serializerConfigSnapshots;
-	}
-
-	/**
-	 * Utility serialization proxy for a {@link TypeSerializerConfigSnapshot}.
-	 */
-	static class TypeSerializerConfigSnapshotProxy extends VersionedIOReadableWritable {
-
-		private static final int VERSION = 1;
-
-		private ClassLoader userCodeClassLoader;
-		private TypeSerializerConfigSnapshot serializerConfigSnapshot;
-
-		TypeSerializerConfigSnapshotProxy(ClassLoader userCodeClassLoader) {
-			this.userCodeClassLoader = Preconditions.checkNotNull(userCodeClassLoader);
-		}
-
-		TypeSerializerConfigSnapshotProxy(TypeSerializerConfigSnapshot serializerConfigSnapshot) {
-			this.serializerConfigSnapshot = serializerConfigSnapshot;
-		}
-
-		@Override
-		public void write(DataOutputView out) throws IOException {
-			super.write(out);
-
-			// config snapshot class, so that we can re-instantiate the
-			// correct type of config snapshot instance when deserializing
-			out.writeUTF(serializerConfigSnapshot.getClass().getName());
-
-			// the actual configuration parameters
-			serializerConfigSnapshot.write(out);
-		}
-
-		@SuppressWarnings("unchecked")
-		@Override
-		public void read(DataInputView in) throws IOException {
-			super.read(in);
-
-			String serializerConfigClassname = in.readUTF();
-			Class<? extends TypeSerializerConfigSnapshot> serializerConfigSnapshotClass;
-			try {
-				serializerConfigSnapshotClass = (Class<? extends TypeSerializerConfigSnapshot>)
-					Class.forName(serializerConfigClassname, true, userCodeClassLoader);
-			} catch (ClassNotFoundException e) {
-				throw new IOException(
-					"Could not find requested TypeSerializerConfigSnapshot class "
-						+ serializerConfigClassname +  " in classpath.", e);
-			}
-
-			serializerConfigSnapshot = InstantiationUtil.instantiate(serializerConfigSnapshotClass);
-			serializerConfigSnapshot.setUserCodeClassLoader(userCodeClassLoader);
-			serializerConfigSnapshot.read(in);
-		}
-
-		@Override
-		public int getVersion() {
-			return VERSION;
-		}
-
-		TypeSerializerConfigSnapshot getSerializerConfigSnapshot() {
-			return serializerConfigSnapshot;
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a7bc5de9/flink-core/src/main/java/org/apache/flink/api/common/typeutils/UnloadableDummyTypeSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/UnloadableDummyTypeSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/UnloadableDummyTypeSerializer.java
new file mode 100644
index 0000000..ddfeab4
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/UnloadableDummyTypeSerializer.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils;
+
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.io.InvalidClassException;
+import java.util.Arrays;
+
+/**
+ * Dummy TypeSerializer to avoid that data is lost when checkpointing again a serializer for which we encountered
+ * a {@link ClassNotFoundException} or {@link InvalidClassException}.
+ */
+public class UnloadableDummyTypeSerializer<T> extends TypeSerializer<T> {
+
+	private static final long serialVersionUID = 2526330533671642711L;
+	private final byte[] actualBytes;
+
+	public UnloadableDummyTypeSerializer(byte[] actualBytes) {
+		this.actualBytes = Preconditions.checkNotNull(actualBytes);
+	}
+
+	public byte[] getActualBytes() {
+		return actualBytes;
+	}
+
+	@Override
+	public boolean isImmutableType() {
+		throw new UnsupportedOperationException("This object is a dummy TypeSerializer.");
+	}
+
+	@Override
+	public TypeSerializer<T> duplicate() {
+		throw new UnsupportedOperationException("This object is a dummy TypeSerializer.");
+	}
+
+	@Override
+	public T createInstance() {
+		throw new UnsupportedOperationException("This object is a dummy TypeSerializer.");
+	}
+
+	@Override
+	public T copy(T from) {
+		throw new UnsupportedOperationException("This object is a dummy TypeSerializer.");
+	}
+
+	@Override
+	public T copy(T from, T reuse) {
+		throw new UnsupportedOperationException("This object is a dummy TypeSerializer.");
+	}
+
+	@Override
+	public int getLength() {
+		throw new UnsupportedOperationException("This object is a dummy TypeSerializer.");
+	}
+
+	@Override
+	public void serialize(T record, DataOutputView target) throws IOException {
+		throw new UnsupportedOperationException("This object is a dummy TypeSerializer.");
+	}
+
+	@Override
+	public T deserialize(DataInputView source) throws IOException {
+		throw new UnsupportedOperationException("This object is a dummy TypeSerializer.");
+	}
+
+	@Override
+	public T deserialize(T reuse, DataInputView source) throws IOException {
+		throw new UnsupportedOperationException("This object is a dummy TypeSerializer.");
+	}
+
+	@Override
+	public void copy(DataInputView source, DataOutputView target) throws IOException {
+		throw new UnsupportedOperationException("This object is a dummy TypeSerializer.");
+	}
+
+	@Override
+	public TypeSerializerConfigSnapshot snapshotConfiguration() {
+		throw new UnsupportedOperationException("This object is a dummy TypeSerializer.");
+	}
+
+	@Override
+	public CompatibilityResult<T> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
+		throw new UnsupportedOperationException("This object is a dummy TypeSerializer.");
+	}
+
+	@Override
+	public boolean canEqual(Object obj) {
+		return false;
+	}
+
+	@Override
+	public boolean equals(Object o) {
+		if (this == o) {
+			return true;
+		}
+
+		if (o == null || getClass() != o.getClass()) {
+			return false;
+		}
+
+		UnloadableDummyTypeSerializer<?> that = (UnloadableDummyTypeSerializer<?>) o;
+
+		return Arrays.equals(getActualBytes(), that.getActualBytes());
+	}
+
+	@Override
+	public int hashCode() {
+		return Arrays.hashCode(getActualBytes());
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a7bc5de9/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/CollectionSerializerConfigSnapshot.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/CollectionSerializerConfigSnapshot.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/CollectionSerializerConfigSnapshot.java
index 8fa2315..5572985 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/CollectionSerializerConfigSnapshot.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/CollectionSerializerConfigSnapshot.java
@@ -20,21 +20,23 @@ package org.apache.flink.api.common.typeutils.base;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot;
-import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
 
 /**
  * Configuration snapshot of a serializer for collection types.
+ *
+ * @param <T> Type of the element.
  */
 @Internal
-public final class CollectionSerializerConfigSnapshot extends CompositeTypeSerializerConfigSnapshot {
+public final class CollectionSerializerConfigSnapshot<T> extends CompositeTypeSerializerConfigSnapshot {
 
 	private static final int VERSION = 1;
 
 	/** This empty nullary constructor is required for deserializing the configuration. */
 	public CollectionSerializerConfigSnapshot() {}
 
-	public CollectionSerializerConfigSnapshot(TypeSerializerConfigSnapshot elementSerializerConfigSnapshot) {
-		super(elementSerializerConfigSnapshot);
+	public CollectionSerializerConfigSnapshot(TypeSerializer<T> elementSerializer) {
+		super(elementSerializer);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/a7bc5de9/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializer.java
index 54c604c..cdfc964 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializer.java
@@ -23,9 +23,12 @@ import java.lang.reflect.Array;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.typeutils.CompatibilityResult;
+import org.apache.flink.api.common.typeutils.CompatibilityUtil;
 import org.apache.flink.api.common.typeutils.TypeDeserializerAdapter;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 
@@ -196,7 +199,7 @@ public final class GenericArraySerializer<C> extends TypeSerializer<C[]> {
 
 	@Override
 	public GenericArraySerializerConfigSnapshot snapshotConfiguration() {
-		return new GenericArraySerializerConfigSnapshot<>(componentClass, componentSerializer.snapshotConfiguration());
+		return new GenericArraySerializerConfigSnapshot<>(componentClass, componentSerializer);
 	}
 
 	@Override
@@ -205,8 +208,14 @@ public final class GenericArraySerializer<C> extends TypeSerializer<C[]> {
 			final GenericArraySerializerConfigSnapshot config = (GenericArraySerializerConfigSnapshot) configSnapshot;
 
 			if (componentClass.equals(config.getComponentClass())) {
-				CompatibilityResult<C> compatResult = componentSerializer.ensureCompatibility(
-					config.getSingleNestedSerializerConfigSnapshot());
+				Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot> previousComponentSerializerAndConfig =
+					config.getSingleNestedSerializerAndConfig();
+
+				CompatibilityResult<C> compatResult = CompatibilityUtil.resolveCompatibilityResult(
+						previousComponentSerializerAndConfig.f0,
+						UnloadableDummyTypeSerializer.class,
+						previousComponentSerializerAndConfig.f1,
+						componentSerializer);
 
 				if (!compatResult.isRequiresMigration()) {
 					return CompatibilityResult.compatible();

http://git-wip-us.apache.org/repos/asf/flink/blob/a7bc5de9/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializerConfigSnapshot.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializerConfigSnapshot.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializerConfigSnapshot.java
index e78eb6c..79dcf89 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializerConfigSnapshot.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializerConfigSnapshot.java
@@ -20,7 +20,7 @@ package org.apache.flink.api.common.typeutils.base;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot;
-import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.typeutils.runtime.DataInputViewStream;
 import org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream;
 import org.apache.flink.core.memory.DataInputView;
@@ -47,9 +47,9 @@ public final class GenericArraySerializerConfigSnapshot<C> extends CompositeType
 
 	public GenericArraySerializerConfigSnapshot(
 			Class<C> componentClass,
-			TypeSerializerConfigSnapshot componentSerializerConfigSnapshot) {
+			TypeSerializer<C> componentSerializer) {
 
-		super(componentSerializerConfigSnapshot);
+		super(componentSerializer);
 
 		this.componentClass = Preconditions.checkNotNull(componentClass);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/a7bc5de9/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ListSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ListSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ListSerializer.java
index 1f271fe..c2b935c 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ListSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ListSerializer.java
@@ -20,9 +20,12 @@ package org.apache.flink.api.common.typeutils.base;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.typeutils.CompatibilityResult;
+import org.apache.flink.api.common.typeutils.CompatibilityUtil;
 import org.apache.flink.api.common.typeutils.TypeDeserializerAdapter;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 
@@ -177,14 +180,20 @@ public final class ListSerializer<T> extends TypeSerializer<List<T>> {
 
 	@Override
 	public CollectionSerializerConfigSnapshot snapshotConfiguration() {
-		return new CollectionSerializerConfigSnapshot(elementSerializer.snapshotConfiguration());
+		return new CollectionSerializerConfigSnapshot<>(elementSerializer);
 	}
 
 	@Override
 	public CompatibilityResult<List<T>> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
 		if (configSnapshot instanceof CollectionSerializerConfigSnapshot) {
-			CompatibilityResult<T> compatResult = elementSerializer.ensureCompatibility(
-				((CollectionSerializerConfigSnapshot) configSnapshot).getSingleNestedSerializerConfigSnapshot());
+			Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot> previousElemSerializerAndConfig =
+				((CollectionSerializerConfigSnapshot) configSnapshot).getSingleNestedSerializerAndConfig();
+
+			CompatibilityResult<T> compatResult = CompatibilityUtil.resolveCompatibilityResult(
+					previousElemSerializerAndConfig.f0,
+					UnloadableDummyTypeSerializer.class,
+					previousElemSerializerAndConfig.f1,
+					elementSerializer);
 
 			if (!compatResult.isRequiresMigration()) {
 				return CompatibilityResult.compatible();

http://git-wip-us.apache.org/repos/asf/flink/blob/a7bc5de9/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MapSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MapSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MapSerializer.java
index d5d6ec8..23b494b 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MapSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MapSerializer.java
@@ -20,14 +20,18 @@ package org.apache.flink.api.common.typeutils.base;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.typeutils.CompatibilityResult;
+import org.apache.flink.api.common.typeutils.CompatibilityUtil;
 import org.apache.flink.api.common.typeutils.TypeDeserializerAdapter;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.util.Preconditions;
 
 import java.io.IOException;
+import java.util.List;
 import java.util.Map;
 import java.util.HashMap;
 
@@ -202,19 +206,26 @@ public final class MapSerializer<K, V> extends TypeSerializer<Map<K, V>> {
 
 	@Override
 	public MapSerializerConfigSnapshot snapshotConfiguration() {
-		return new MapSerializerConfigSnapshot(
-				keySerializer.snapshotConfiguration(),
-				valueSerializer.snapshotConfiguration());
+		return new MapSerializerConfigSnapshot<>(keySerializer, valueSerializer);
 	}
 
 	@Override
 	public CompatibilityResult<Map<K, V>> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
 		if (configSnapshot instanceof MapSerializerConfigSnapshot) {
-			TypeSerializerConfigSnapshot[] keyValueSerializerConfigSnapshots =
-				((MapSerializerConfigSnapshot) configSnapshot).getNestedSerializerConfigSnapshots();
-
-			CompatibilityResult<K> keyCompatResult = keySerializer.ensureCompatibility(keyValueSerializerConfigSnapshots[0]);
-			CompatibilityResult<V> valueCompatResult = valueSerializer.ensureCompatibility(keyValueSerializerConfigSnapshots[1]);
+			List<Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> previousKvSerializersAndConfigs =
+				((MapSerializerConfigSnapshot) configSnapshot).getNestedSerializersAndConfigs();
+
+			CompatibilityResult<K> keyCompatResult = CompatibilityUtil.resolveCompatibilityResult(
+					previousKvSerializersAndConfigs.get(0).f0,
+					UnloadableDummyTypeSerializer.class,
+					previousKvSerializersAndConfigs.get(0).f1,
+					keySerializer);
+
+			CompatibilityResult<V> valueCompatResult = CompatibilityUtil.resolveCompatibilityResult(
+					previousKvSerializersAndConfigs.get(1).f0,
+					UnloadableDummyTypeSerializer.class,
+					previousKvSerializersAndConfigs.get(1).f1,
+					valueSerializer);
 
 			if (!keyCompatResult.isRequiresMigration() && !valueCompatResult.isRequiresMigration()) {
 				return CompatibilityResult.compatible();

http://git-wip-us.apache.org/repos/asf/flink/blob/a7bc5de9/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MapSerializerConfigSnapshot.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MapSerializerConfigSnapshot.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MapSerializerConfigSnapshot.java
index 38e1254..9db3019 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MapSerializerConfigSnapshot.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MapSerializerConfigSnapshot.java
@@ -20,25 +20,22 @@ package org.apache.flink.api.common.typeutils.base;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot;
-import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
 
 /**
  * Configuration snapshot for serializers of maps, containing the
  * configuration snapshot of its key serializer and value serializer.
  */
 @Internal
-public final class MapSerializerConfigSnapshot extends CompositeTypeSerializerConfigSnapshot {
+public final class MapSerializerConfigSnapshot<K, V> extends CompositeTypeSerializerConfigSnapshot {
 
 	private static final int VERSION = 1;
 
 	/** This empty nullary constructor is required for deserializing the configuration. */
 	public MapSerializerConfigSnapshot() {}
 
-	public MapSerializerConfigSnapshot(
-			TypeSerializerConfigSnapshot keySerializerConfigSnapshot,
-			TypeSerializerConfigSnapshot valueSerializerConfigSnapshot) {
-
-		super(keySerializerConfigSnapshot, valueSerializerConfigSnapshot);
+	public MapSerializerConfigSnapshot(TypeSerializer<K> keySerializer, TypeSerializer<V> valueSerializer) {
+		super(keySerializer, valueSerializer);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/a7bc5de9/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializer.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializer.java
index 4373ee0..18ebcd8 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializer.java
@@ -20,14 +20,18 @@ package org.apache.flink.api.java.typeutils.runtime;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.typeutils.CompatibilityResult;
+import org.apache.flink.api.common.typeutils.CompatibilityUtil;
 import org.apache.flink.api.common.typeutils.TypeDeserializerAdapter;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.types.Either;
 
 import java.io.IOException;
+import java.util.List;
 
 import static org.apache.flink.types.Either.Left;
 import static org.apache.flink.types.Either.Right;
@@ -193,19 +197,26 @@ public class EitherSerializer<L, R> extends TypeSerializer<Either<L, R>> {
 
 	@Override
 	public EitherSerializerConfigSnapshot snapshotConfiguration() {
-		return new EitherSerializerConfigSnapshot(
-				leftSerializer.snapshotConfiguration(),
-				rightSerializer.snapshotConfiguration());
+		return new EitherSerializerConfigSnapshot<>(leftSerializer, rightSerializer);
 	}
 
 	@Override
 	public CompatibilityResult<Either<L, R>> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
 		if (configSnapshot instanceof EitherSerializerConfigSnapshot) {
-			TypeSerializerConfigSnapshot[] leftRightSerializerConfigSnapshots =
-				((EitherSerializerConfigSnapshot) configSnapshot).getNestedSerializerConfigSnapshots();
-
-			CompatibilityResult<L> leftCompatResult = leftSerializer.ensureCompatibility(leftRightSerializerConfigSnapshots[0]);
-			CompatibilityResult<R> rightCompatResult = rightSerializer.ensureCompatibility(leftRightSerializerConfigSnapshots[1]);
+			List<Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> previousLeftRightSerializersAndConfigs =
+				((EitherSerializerConfigSnapshot) configSnapshot).getNestedSerializersAndConfigs();
+
+			CompatibilityResult<L> leftCompatResult = CompatibilityUtil.resolveCompatibilityResult(
+					previousLeftRightSerializersAndConfigs.get(0).f0,
+					UnloadableDummyTypeSerializer.class,
+					previousLeftRightSerializersAndConfigs.get(0).f1,
+					leftSerializer);
+
+			CompatibilityResult<R> rightCompatResult = CompatibilityUtil.resolveCompatibilityResult(
+					previousLeftRightSerializersAndConfigs.get(1).f0,
+					UnloadableDummyTypeSerializer.class,
+					previousLeftRightSerializersAndConfigs.get(1).f1,
+					rightSerializer);
 
 			if (!leftCompatResult.isRequiresMigration() && !rightCompatResult.isRequiresMigration()) {
 				return CompatibilityResult.compatible();

http://git-wip-us.apache.org/repos/asf/flink/blob/a7bc5de9/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializerConfigSnapshot.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializerConfigSnapshot.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializerConfigSnapshot.java
index 473d438..f996878 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializerConfigSnapshot.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializerConfigSnapshot.java
@@ -20,7 +20,7 @@ package org.apache.flink.api.java.typeutils.runtime;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot;
-import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.types.Either;
 
 /**
@@ -28,18 +28,15 @@ import org.apache.flink.types.Either;
  * containing configuration snapshots of the Left and Right serializers.
  */
 @Internal
-public final class EitherSerializerConfigSnapshot extends CompositeTypeSerializerConfigSnapshot {
+public final class EitherSerializerConfigSnapshot<L, R> extends CompositeTypeSerializerConfigSnapshot {
 
 	private static final int VERSION = 1;
 
 	/** This empty nullary constructor is required for deserializing the configuration. */
 	public EitherSerializerConfigSnapshot() {}
 
-	public EitherSerializerConfigSnapshot(
-			TypeSerializerConfigSnapshot leftSerializerConfigSnapshot,
-			TypeSerializerConfigSnapshot rightSerializerConfigSnapshot) {
-
-		super(leftSerializerConfigSnapshot, rightSerializerConfigSnapshot);
+	public EitherSerializerConfigSnapshot(TypeSerializer<L> leftSerializer, TypeSerializer<R> rightSerializer) {
+		super(leftSerializer, rightSerializer);
 	}
 
 	@Override