You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2017/05/22 15:30:00 UTC

[2/3] flink git commit: [FLINK-6482] [core] Add nested serializers to config snapshots of composite serializers

http://git-wip-us.apache.org/repos/asf/flink/blob/a7bc5de9/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
index a8368c4..2311158 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
@@ -34,13 +34,20 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeutils.CompatibilityResult;
+import org.apache.flink.api.common.typeutils.CompatibilityUtil;
 import org.apache.flink.api.common.typeutils.GenericTypeSerializerConfigSnapshot;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
-import org.apache.flink.api.common.typeutils.TypeSerializerUtil;
+import org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil;
+import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
+import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
 import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.util.Preconditions;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -134,6 +141,25 @@ public final class PojoSerializer<T> extends TypeSerializer<T> {
 
 		this.subclassSerializerCache = new HashMap<>();
 	}
+
+	public PojoSerializer(
+			Class<T> clazz,
+			Field[] fields,
+			TypeSerializer<Object>[] fieldSerializers,
+			LinkedHashMap<Class<?>, Integer> registeredClasses,
+			TypeSerializer<?>[] registeredSerializers,
+			HashMap<Class<?>, TypeSerializer<?>> subclassSerializerCache) {
+
+		this.clazz = checkNotNull(clazz);
+		this.fields = checkNotNull(fields);
+		this.numFields = fields.length;
+		this.fieldSerializers = checkNotNull(fieldSerializers);
+		this.registeredClasses = checkNotNull(registeredClasses);
+		this.registeredSerializers = checkNotNull(registeredSerializers);
+		this.subclassSerializerCache = checkNotNull(subclassSerializerCache);
+
+		this.executionConfig = null;
+	}
 	
 	@Override
 	public boolean isImmutableType() {
@@ -558,6 +584,8 @@ public final class PojoSerializer<T> extends TypeSerializer<T> {
 		if (configSnapshot instanceof PojoSerializerConfigSnapshot) {
 			final PojoSerializerConfigSnapshot<T> config = (PojoSerializerConfigSnapshot<T>) configSnapshot;
 
+			boolean requiresMigration = false;
+
 			if (clazz.equals(config.getTypeClass())) {
 				if (this.numFields == config.getFieldToSerializerConfigSnapshot().size()) {
 
@@ -572,16 +600,27 @@ public final class PojoSerializer<T> extends TypeSerializer<T> {
 						(TypeSerializer<Object>[]) new TypeSerializer<?>[this.numFields];
 
 					int i = 0;
-					for (Map.Entry<Field, TypeSerializerConfigSnapshot> fieldToConfigSnapshotEntry
+					for (Map.Entry<Field, Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> fieldToConfigSnapshotEntry
 							: config.getFieldToSerializerConfigSnapshot().entrySet()) {
 
 						int fieldIndex = findField(fieldToConfigSnapshotEntry.getKey());
 						if (fieldIndex != -1) {
 							reorderedFields[i] = fieldToConfigSnapshotEntry.getKey();
 
-							compatResult = fieldSerializers[fieldIndex].ensureCompatibility(fieldToConfigSnapshotEntry.getValue());
+							compatResult = CompatibilityUtil.resolveCompatibilityResult(
+									fieldToConfigSnapshotEntry.getValue().f0,
+									UnloadableDummyTypeSerializer.class,
+									fieldToConfigSnapshotEntry.getValue().f1,
+									fieldSerializers[fieldIndex]);
+
 							if (compatResult.isRequiresMigration()) {
-								return CompatibilityResult.requiresMigration();
+								requiresMigration = true;
+
+								if (compatResult.getConvertDeserializer() != null) {
+									reorderedFieldSerializers[i] = (TypeSerializer<Object>) compatResult.getConvertDeserializer();
+								} else {
+									return CompatibilityResult.requiresMigration();
+								}
 							} else {
 								reorderedFieldSerializers[i] = fieldSerializers[fieldIndex];
 							}
@@ -599,7 +638,7 @@ public final class PojoSerializer<T> extends TypeSerializer<T> {
 					final LinkedHashMap<Class<?>, Integer> reorderedRegisteredSubclassesToClasstags;
 					final TypeSerializer<?>[] reorderedRegisteredSubclassSerializers;
 
-					final LinkedHashMap<Class<?>, TypeSerializerConfigSnapshot> previousRegistrations =
+					final LinkedHashMap<Class<?>, Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> previousRegistrations =
 						config.getRegisteredSubclassesToSerializerConfigSnapshots();
 
 					// the reconfigured list of registered subclasses will be the previous registered
@@ -615,11 +654,20 @@ public final class PojoSerializer<T> extends TypeSerializer<T> {
 						reorderedRegisteredSubclasses, executionConfig);
 
 					i = 0;
-					for (TypeSerializerConfigSnapshot previousRegisteredSerializerConfig : previousRegistrations.values()) {
+					for (Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot> previousRegisteredSerializerConfig : previousRegistrations.values()) {
 						// check compatibility of subclass serializer
-						compatResult = reorderedRegisteredSubclassSerializers[i].ensureCompatibility(previousRegisteredSerializerConfig);
+						compatResult = CompatibilityUtil.resolveCompatibilityResult(
+								previousRegisteredSerializerConfig.f0,
+								UnloadableDummyTypeSerializer.class,
+								previousRegisteredSerializerConfig.f1,
+								reorderedRegisteredSubclassSerializers[i]);
+
 						if (compatResult.isRequiresMigration()) {
-							return CompatibilityResult.requiresMigration();
+							requiresMigration = true;
+
+							if (compatResult.getConvertDeserializer() == null) {
+								return CompatibilityResult.requiresMigration();
+							}
 						}
 
 						i++;
@@ -631,15 +679,26 @@ public final class PojoSerializer<T> extends TypeSerializer<T> {
 					// this won't be applied to this serializer until all compatibility checks have been completed
 					HashMap<Class<?>, TypeSerializer<?>> rebuiltCache = new HashMap<>();
 
-					for (Map.Entry<Class<?>, TypeSerializerConfigSnapshot> previousCachedEntry
+					for (Map.Entry<Class<?>, Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> previousCachedEntry
 							: config.getNonRegisteredSubclassesToSerializerConfigSnapshots().entrySet()) {
 
 						TypeSerializer<?> cachedSerializer = createSubclassSerializer(previousCachedEntry.getKey());
 
 						// check compatibility of cached subclass serializer
-						compatResult = cachedSerializer.ensureCompatibility(previousCachedEntry.getValue());
+						compatResult = CompatibilityUtil.resolveCompatibilityResult(
+								previousCachedEntry.getValue().f0,
+								UnloadableDummyTypeSerializer.class,
+								previousCachedEntry.getValue().f1,
+								cachedSerializer);
+
 						if (compatResult.isRequiresMigration()) {
-							return CompatibilityResult.requiresMigration();
+							requiresMigration = true;
+
+							if (compatResult.getConvertDeserializer() != null) {
+								rebuiltCache.put(previousCachedEntry.getKey(), cachedSerializer);
+							} else {
+								return CompatibilityResult.requiresMigration();
+							}
 						} else {
 							rebuiltCache.put(previousCachedEntry.getKey(), cachedSerializer);
 						}
@@ -648,15 +707,26 @@ public final class PojoSerializer<T> extends TypeSerializer<T> {
 					// completed compatibility checks; up to this point, we can just reconfigure
 					// the serializer so that it is compatible and migration is not required
 
-					this.fields = reorderedFields;
-					this.fieldSerializers = reorderedFieldSerializers;
+					if (!requiresMigration) {
+						this.fields = reorderedFields;
+						this.fieldSerializers = reorderedFieldSerializers;
 
-					this.registeredClasses = reorderedRegisteredSubclassesToClasstags;
-					this.registeredSerializers = reorderedRegisteredSubclassSerializers;
+						this.registeredClasses = reorderedRegisteredSubclassesToClasstags;
+						this.registeredSerializers = reorderedRegisteredSubclassSerializers;
 
-					this.subclassSerializerCache = rebuiltCache;
+						this.subclassSerializerCache = rebuiltCache;
 
-					return CompatibilityResult.compatible();
+						return CompatibilityResult.compatible();
+					} else {
+						return CompatibilityResult.requiresMigration(
+							new PojoSerializer<>(
+								clazz,
+								reorderedFields,
+								reorderedFieldSerializers,
+								reorderedRegisteredSubclassesToClasstags,
+								reorderedRegisteredSubclassSerializers,
+								rebuiltCache));
+					}
 				}
 			}
 		}
@@ -669,39 +739,56 @@ public final class PojoSerializer<T> extends TypeSerializer<T> {
 		private static final int VERSION = 1;
 
 		/**
-		 * Ordered map of POJO fields to the configuration snapshots of their corresponding serializers.
+		 * Ordered map of POJO fields to their corresponding serializers and its configuration snapshots.
 		 *
 		 * <p>Ordering of the fields is kept so that new Pojo serializers for previous data
 		 * may reorder the fields in case they are different. The order of the fields need to
 		 * stay the same for binary compatibility, as the field order is part of the serialization format.
 		 */
-		private LinkedHashMap<Field, TypeSerializerConfigSnapshot> fieldToSerializerConfigSnapshot;
+		private LinkedHashMap<Field, Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> fieldToSerializerConfigSnapshot;
 
 		/**
-		 * Ordered map of registered subclasses to the configuration snapshots of their corresponding serializers.
+		 * Ordered map of registered subclasses to their corresponding serializers and its configuration snapshots.
 		 *
 		 * <p>Ordering of the registered subclasses is kept so that new Pojo serializers for previous data
 		 * may retain the same class tag used for registered subclasses. Newly registered subclasses that
 		 * weren't present before should be appended with the next available class tag.
 		 */
-		private LinkedHashMap<Class<?>, TypeSerializerConfigSnapshot> registeredSubclassesToSerializerConfigSnapshots;
+		private LinkedHashMap<Class<?>, Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> registeredSubclassesToSerializerConfigSnapshots;
 
 		/**
-		 * Configuration snapshots of previously cached non-registered subclass serializers.
+		 * Previously cached non-registered subclass serializers and its configuration snapshots.
 		 *
 		 * <p>This is kept so that new Pojo serializers may eagerly repopulate their
 		 * cache with reconfigured subclass serializers.
 		 */
-		private HashMap<Class<?>, TypeSerializerConfigSnapshot> nonRegisteredSubclassesToSerializerConfigSnapshots;
+		private HashMap<Class<?>, Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> nonRegisteredSubclassesToSerializerConfigSnapshots;
+
+		private boolean ignoreTypeSerializerSerialization;
 
 		/** This empty nullary constructor is required for deserializing the configuration. */
 		public PojoSerializerConfigSnapshot() {}
 
 		public PojoSerializerConfigSnapshot(
 				Class<T> pojoType,
-				LinkedHashMap<Field, TypeSerializerConfigSnapshot> fieldToSerializerConfigSnapshot,
-				LinkedHashMap<Class<?>, TypeSerializerConfigSnapshot> registeredSubclassesToSerializerConfigSnapshots,
-				HashMap<Class<?>, TypeSerializerConfigSnapshot> nonRegisteredSubclassesToSerializerConfigSnapshots) {
+				LinkedHashMap<Field, Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> fieldToSerializerConfigSnapshot,
+				LinkedHashMap<Class<?>, Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> registeredSubclassesToSerializerConfigSnapshots,
+				HashMap<Class<?>, Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> nonRegisteredSubclassesToSerializerConfigSnapshots) {
+
+			this(
+				pojoType,
+				fieldToSerializerConfigSnapshot,
+				registeredSubclassesToSerializerConfigSnapshots,
+				nonRegisteredSubclassesToSerializerConfigSnapshots,
+				false);
+		}
+
+		public PojoSerializerConfigSnapshot(
+				Class<T> pojoType,
+				LinkedHashMap<Field, Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> fieldToSerializerConfigSnapshot,
+				LinkedHashMap<Class<?>, Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> registeredSubclassesToSerializerConfigSnapshots,
+				HashMap<Class<?>, Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> nonRegisteredSubclassesToSerializerConfigSnapshots,
+				boolean ignoreTypeSerializerSerialization) {
 
 			super(pojoType);
 
@@ -711,37 +798,71 @@ public final class PojoSerializer<T> extends TypeSerializer<T> {
 					Preconditions.checkNotNull(registeredSubclassesToSerializerConfigSnapshots);
 			this.nonRegisteredSubclassesToSerializerConfigSnapshots =
 					Preconditions.checkNotNull(nonRegisteredSubclassesToSerializerConfigSnapshots);
+
+			this.ignoreTypeSerializerSerialization = ignoreTypeSerializerSerialization;
 		}
 
 		@Override
 		public void write(DataOutputView out) throws IOException {
 			super.write(out);
 
-			// --- write fields and their serializers, in order
+			try (
+				ByteArrayOutputStreamWithPos outWithPos = new ByteArrayOutputStreamWithPos();
+				DataOutputViewStreamWrapper outViewWrapper = new DataOutputViewStreamWrapper(outWithPos)) {
 
-			out.writeInt(fieldToSerializerConfigSnapshot.size());
-			for (Map.Entry<Field, TypeSerializerConfigSnapshot> entry
-				: fieldToSerializerConfigSnapshot.entrySet()) {
-				out.writeUTF(entry.getKey().getName());
-				TypeSerializerUtil.writeSerializerConfigSnapshot(out, entry.getValue());
-			}
+				// --- write fields and their serializers, in order
 
-			// --- write registered subclasses and their serializers, in registration order
+				out.writeInt(fieldToSerializerConfigSnapshot.size());
+				for (Map.Entry<Field, Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> entry
+						: fieldToSerializerConfigSnapshot.entrySet()) {
 
-			out.writeInt(registeredSubclassesToSerializerConfigSnapshots.size());
-			for (Map.Entry<Class<?>, TypeSerializerConfigSnapshot> entry
-					: registeredSubclassesToSerializerConfigSnapshots.entrySet()) {
-				out.writeUTF(entry.getKey().getName());
-				TypeSerializerUtil.writeSerializerConfigSnapshot(out, entry.getValue());
-			}
+					outViewWrapper.writeUTF(entry.getKey().getName());
+
+					out.writeInt(outWithPos.getPosition());
+					if (!ignoreTypeSerializerSerialization) {
+						TypeSerializerSerializationUtil.writeSerializer(outViewWrapper, entry.getValue().f0);
+					}
+
+					out.writeInt(outWithPos.getPosition());
+					TypeSerializerSerializationUtil.writeSerializerConfigSnapshot(outViewWrapper, entry.getValue().f1);
+				}
+
+				// --- write registered subclasses and their serializers, in registration order
+
+				out.writeInt(registeredSubclassesToSerializerConfigSnapshots.size());
+				for (Map.Entry<Class<?>, Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> entry
+						: registeredSubclassesToSerializerConfigSnapshots.entrySet()) {
+
+					outViewWrapper.writeUTF(entry.getKey().getName());
+
+					out.writeInt(outWithPos.getPosition());
+					if (!ignoreTypeSerializerSerialization) {
+						TypeSerializerSerializationUtil.writeSerializer(outViewWrapper, entry.getValue().f0);
+					}
+
+					out.writeInt(outWithPos.getPosition());
+					TypeSerializerSerializationUtil.writeSerializerConfigSnapshot(outViewWrapper, entry.getValue().f1);
+				}
+
+				// --- write snapshot of non-registered subclass serializer cache
+
+				out.writeInt(nonRegisteredSubclassesToSerializerConfigSnapshots.size());
+				for (Map.Entry<Class<?>, Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> entry
+						: nonRegisteredSubclassesToSerializerConfigSnapshots.entrySet()) {
+
+					outViewWrapper.writeUTF(entry.getKey().getName());
+
+					out.writeInt(outWithPos.getPosition());
+					if (!ignoreTypeSerializerSerialization) {
+						TypeSerializerSerializationUtil.writeSerializer(outViewWrapper, entry.getValue().f0);
+					}
 
-			// --- write snapshot of non-registered subclass serializer cache
+					out.writeInt(outWithPos.getPosition());
+					TypeSerializerSerializationUtil.writeSerializerConfigSnapshot(outViewWrapper, entry.getValue().f1);
+				}
 
-			out.writeInt(nonRegisteredSubclassesToSerializerConfigSnapshots.size());
-			for (Map.Entry<Class<?>, TypeSerializerConfigSnapshot> entry
-					: nonRegisteredSubclassesToSerializerConfigSnapshots.entrySet()) {
-				out.writeUTF(entry.getKey().getName());
-				TypeSerializerUtil.writeSerializerConfigSnapshot(out, entry.getValue());
+				out.writeInt(outWithPos.getPosition());
+				out.write(outWithPos.getBuf(), 0 , outWithPos.getPosition());
 			}
 		}
 
@@ -749,74 +870,126 @@ public final class PojoSerializer<T> extends TypeSerializer<T> {
 		public void read(DataInputView in) throws IOException {
 			super.read(in);
 
-			// --- read fields and their serializers, in order
-
 			int numFields = in.readInt();
-			this.fieldToSerializerConfigSnapshot = new LinkedHashMap<>(numFields);
-			String fieldName;
-			Field field;
+			int[] fieldSerializerOffsets = new int[numFields * 2];
 			for (int i = 0; i < numFields; i++) {
-				fieldName = in.readUTF();
+				fieldSerializerOffsets[i * 2] = in.readInt();
+				fieldSerializerOffsets[i * 2 + 1] = in.readInt();
+			}
 
-				// search all superclasses for the field
-				Class<?> clazz = getTypeClass();
-				field = null;
-				while (clazz != null) {
-					try {
-						field = clazz.getDeclaredField(fieldName);
-						field.setAccessible(true);
-						break;
-					} catch (NoSuchFieldException e) {
-						clazz = clazz.getSuperclass();
+
+			int numRegisteredSubclasses = in.readInt();
+			int[] registeredSerializerOffsets = new int[numRegisteredSubclasses * 2];
+			for (int i = 0; i < numRegisteredSubclasses; i++) {
+				registeredSerializerOffsets[i * 2] = in.readInt();
+				registeredSerializerOffsets[i * 2 + 1] = in.readInt();
+			}
+
+			int numCachedSubclassSerializers = in.readInt();
+			int[] cachedSerializerOffsets = new int[numCachedSubclassSerializers * 2];
+			for (int i = 0; i < numCachedSubclassSerializers; i++) {
+				cachedSerializerOffsets[i * 2] = in.readInt();
+				cachedSerializerOffsets[i * 2 + 1] = in.readInt();
+			}
+
+			int totalBytes = in.readInt();
+			byte[] buffer = new byte[totalBytes];
+			in.readFully(buffer);
+
+			try (
+				ByteArrayInputStreamWithPos inWithPos = new ByteArrayInputStreamWithPos(buffer);
+				DataInputViewStreamWrapper inViewWrapper = new DataInputViewStreamWrapper(inWithPos)) {
+
+				// --- read fields and their serializers, in order
+
+				this.fieldToSerializerConfigSnapshot = new LinkedHashMap<>(numFields);
+				String fieldName;
+				Field field;
+				TypeSerializer<?> fieldSerializer;
+				TypeSerializerConfigSnapshot fieldSerializerConfigSnapshot;
+				for (int i = 0; i < numFields; i++) {
+					fieldName = inViewWrapper.readUTF();
+
+					// search all superclasses for the field
+					Class<?> clazz = getTypeClass();
+					field = null;
+					while (clazz != null) {
+						try {
+							field = clazz.getDeclaredField(fieldName);
+							field.setAccessible(true);
+							break;
+						} catch (NoSuchFieldException e) {
+							clazz = clazz.getSuperclass();
+						}
 					}
-				}
 
-				if (field == null) {
-					// the field no longer exists in the POJO
-					throw new IOException("Can't find field " + fieldName + " in POJO class " + getTypeClass().getName());
-				} else {
-					fieldToSerializerConfigSnapshot.put(
+					if (field == null) {
+						// the field no longer exists in the POJO
+						throw new IOException("Can't find field " + fieldName + " in POJO class " + getTypeClass().getName());
+					} else {
+						inWithPos.setPosition(fieldSerializerOffsets[i * 2]);
+						fieldSerializer = TypeSerializerSerializationUtil.tryReadSerializer(inViewWrapper, getUserCodeClassLoader());
+
+						inWithPos.setPosition(fieldSerializerOffsets[i * 2 + 1]);
+						fieldSerializerConfigSnapshot = TypeSerializerSerializationUtil.readSerializerConfigSnapshot(inViewWrapper, getUserCodeClassLoader());
+
+						fieldToSerializerConfigSnapshot.put(
 							field,
-							TypeSerializerUtil.readSerializerConfigSnapshot(in, getUserCodeClassLoader()));
+							new Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>(fieldSerializer, fieldSerializerConfigSnapshot));
+					}
 				}
-			}
 
-			// --- read registered subclasses and their serializers, in registration order
+				// --- read registered subclasses and their serializers, in registration order
 
-			int numRegisteredSubclasses = in.readInt();
-			this.registeredSubclassesToSerializerConfigSnapshots = new LinkedHashMap<>(numRegisteredSubclasses);
-			String registeredSubclassname;
-			Class<?> registeredSubclass;
-			for (int i = 0; i < numRegisteredSubclasses; i++) {
-				registeredSubclassname = in.readUTF();
-				try {
-					registeredSubclass = Class.forName(registeredSubclassname, true, getUserCodeClassLoader());
-				} catch (ClassNotFoundException e) {
-					throw new IOException("Cannot find requested class " + registeredSubclassname + " in classpath.", e);
-				}
+				this.registeredSubclassesToSerializerConfigSnapshots = new LinkedHashMap<>(numRegisteredSubclasses);
+				String registeredSubclassname;
+				Class<?> registeredSubclass;
+				TypeSerializer<?> registeredSubclassSerializer;
+				TypeSerializerConfigSnapshot registeredSubclassSerializerConfigSnapshot;
+				for (int i = 0; i < numRegisteredSubclasses; i++) {
+					registeredSubclassname = inViewWrapper.readUTF();
+					try {
+						registeredSubclass = Class.forName(registeredSubclassname, true, getUserCodeClassLoader());
+					} catch (ClassNotFoundException e) {
+						throw new IOException("Cannot find requested class " + registeredSubclassname + " in classpath.", e);
+					}
 
-				this.registeredSubclassesToSerializerConfigSnapshots.put(
-						registeredSubclass,
-						TypeSerializerUtil.readSerializerConfigSnapshot(in, getUserCodeClassLoader()));
-			}
+					inWithPos.setPosition(registeredSerializerOffsets[i * 2]);
+					registeredSubclassSerializer = TypeSerializerSerializationUtil.tryReadSerializer(inViewWrapper, getUserCodeClassLoader());
 
-			// --- read snapshot of non-registered subclass serializer cache
+					inWithPos.setPosition(registeredSerializerOffsets[i * 2 + 1]);
+					registeredSubclassSerializerConfigSnapshot = TypeSerializerSerializationUtil.readSerializerConfigSnapshot(inViewWrapper, getUserCodeClassLoader());
 
-			int numCachedSubclassSerializers = in.readInt();
-			this.nonRegisteredSubclassesToSerializerConfigSnapshots = new HashMap<>(numCachedSubclassSerializers);
-			String cachedSubclassname;
-			Class<?> cachedSubclass;
-			for (int i = 0; i < numCachedSubclassSerializers; i++) {
-				cachedSubclassname = in.readUTF();
-				try {
-					cachedSubclass = Class.forName(cachedSubclassname, true, getUserCodeClassLoader());
-				} catch (ClassNotFoundException e) {
-					throw new IOException("Cannot find requested class " + cachedSubclassname + " in classpath.", e);
+					this.registeredSubclassesToSerializerConfigSnapshots.put(
+						registeredSubclass,
+						new Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>(registeredSubclassSerializer, registeredSubclassSerializerConfigSnapshot));
 				}
 
-				this.nonRegisteredSubclassesToSerializerConfigSnapshots.put(
+				// --- read snapshot of non-registered subclass serializer cache
+
+				this.nonRegisteredSubclassesToSerializerConfigSnapshots = new HashMap<>(numCachedSubclassSerializers);
+				String cachedSubclassname;
+				Class<?> cachedSubclass;
+				TypeSerializer<?> cachedSubclassSerializer;
+				TypeSerializerConfigSnapshot cachedSubclassSerializerConfigSnapshot;
+				for (int i = 0; i < numCachedSubclassSerializers; i++) {
+					cachedSubclassname = inViewWrapper.readUTF();
+					try {
+						cachedSubclass = Class.forName(cachedSubclassname, true, getUserCodeClassLoader());
+					} catch (ClassNotFoundException e) {
+						throw new IOException("Cannot find requested class " + cachedSubclassname + " in classpath.", e);
+					}
+
+					inWithPos.setPosition(cachedSerializerOffsets[i * 2]);
+					cachedSubclassSerializer = TypeSerializerSerializationUtil.tryReadSerializer(inViewWrapper, getUserCodeClassLoader());
+
+					inWithPos.setPosition(cachedSerializerOffsets[i * 2 + 1]);
+					cachedSubclassSerializerConfigSnapshot = TypeSerializerSerializationUtil.readSerializerConfigSnapshot(inViewWrapper, getUserCodeClassLoader());
+
+					this.nonRegisteredSubclassesToSerializerConfigSnapshots.put(
 						cachedSubclass,
-						TypeSerializerUtil.readSerializerConfigSnapshot(in, getUserCodeClassLoader()));
+						new Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>(cachedSubclassSerializer, cachedSubclassSerializerConfigSnapshot));
+				}
 			}
 		}
 
@@ -825,15 +998,15 @@ public final class PojoSerializer<T> extends TypeSerializer<T> {
 			return VERSION;
 		}
 
-		public LinkedHashMap<Field, TypeSerializerConfigSnapshot> getFieldToSerializerConfigSnapshot() {
+		public LinkedHashMap<Field, Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> getFieldToSerializerConfigSnapshot() {
 			return fieldToSerializerConfigSnapshot;
 		}
 
-		public LinkedHashMap<Class<?>, TypeSerializerConfigSnapshot> getRegisteredSubclassesToSerializerConfigSnapshots() {
+		public LinkedHashMap<Class<?>, Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> getRegisteredSubclassesToSerializerConfigSnapshots() {
 			return registeredSubclassesToSerializerConfigSnapshots;
 		}
 
-		public HashMap<Class<?>, TypeSerializerConfigSnapshot> getNonRegisteredSubclassesToSerializerConfigSnapshots() {
+		public HashMap<Class<?>, Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> getNonRegisteredSubclassesToSerializerConfigSnapshots() {
 			return nonRegisteredSubclassesToSerializerConfigSnapshots;
 		}
 
@@ -1001,27 +1174,35 @@ public final class PojoSerializer<T> extends TypeSerializer<T> {
 			TypeSerializer<?>[] fieldSerializers,
 			HashMap<Class<?>, TypeSerializer<?>> nonRegisteredSubclassSerializerCache) {
 
-		final LinkedHashMap<Field, TypeSerializerConfigSnapshot> fieldToSerializerConfigSnapshots =
+		final LinkedHashMap<Field, Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> fieldToSerializerConfigSnapshots =
 			new LinkedHashMap<>(fields.length);
 
 		for (int i = 0; i < fields.length; i++) {
-			fieldToSerializerConfigSnapshots.put(fields[i], fieldSerializers[i].snapshotConfiguration());
+			fieldToSerializerConfigSnapshots.put(
+				fields[i],
+				new Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>(fieldSerializers[i], fieldSerializers[i].snapshotConfiguration()));
 		}
 
-		final LinkedHashMap<Class<?>, TypeSerializerConfigSnapshot> registeredSubclassesToSerializerConfigSnapshots =
+		final LinkedHashMap<Class<?>, Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> registeredSubclassesToSerializerConfigSnapshots =
 				new LinkedHashMap<>(registeredSubclassesToTags.size());
 
 		for (Map.Entry<Class<?>, Integer> entry : registeredSubclassesToTags.entrySet()) {
 			registeredSubclassesToSerializerConfigSnapshots.put(
 					entry.getKey(),
-					registeredSubclassSerializers[entry.getValue()].snapshotConfiguration());
+					new Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>(
+						registeredSubclassSerializers[entry.getValue()],
+						registeredSubclassSerializers[entry.getValue()].snapshotConfiguration()));
 		}
 
-		final HashMap<Class<?>, TypeSerializerConfigSnapshot> nonRegisteredSubclassesToSerializerConfigSnapshots =
+		final HashMap<Class<?>, Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> nonRegisteredSubclassesToSerializerConfigSnapshots =
 				new LinkedHashMap<>(nonRegisteredSubclassSerializerCache.size());
 
 		for (Map.Entry<Class<?>, TypeSerializer<?>> entry : nonRegisteredSubclassSerializerCache.entrySet()) {
-			nonRegisteredSubclassesToSerializerConfigSnapshots.put(entry.getKey(), entry.getValue().snapshotConfiguration());
+			nonRegisteredSubclassesToSerializerConfigSnapshots.put(
+				entry.getKey(),
+				new Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>(
+					entry.getValue(),
+					entry.getValue().snapshotConfiguration()));
 		}
 
 		return new PojoSerializerConfigSnapshot<>(

http://git-wip-us.apache.org/repos/asf/flink/blob/a7bc5de9/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowSerializer.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowSerializer.java
index ba41d4b..bd08b04 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowSerializer.java
@@ -19,11 +19,13 @@ package org.apache.flink.api.java.typeutils.runtime;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.typeutils.CompatibilityResult;
+import org.apache.flink.api.common.typeutils.CompatibilityUtil;
 import org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot;
 import org.apache.flink.api.common.typeutils.TypeDeserializerAdapter;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
-import org.apache.flink.api.common.typeutils.TypeSerializerUtil;
+import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.types.Row;
@@ -31,6 +33,7 @@ import org.apache.flink.types.Row;
 import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.util.Arrays;
+import java.util.List;
 
 import static org.apache.flink.api.java.typeutils.runtime.NullMaskUtils.readIntoAndCopyNullMask;
 import static org.apache.flink.api.java.typeutils.runtime.NullMaskUtils.readIntoNullMask;
@@ -254,22 +257,28 @@ public final class RowSerializer extends TypeSerializer<Row> {
 
 	@Override
 	public RowSerializerConfigSnapshot snapshotConfiguration() {
-		return new RowSerializerConfigSnapshot(TypeSerializerUtil.snapshotConfigurations(fieldSerializers));
+		return new RowSerializerConfigSnapshot(fieldSerializers);
 	}
 
 	@Override
 	public CompatibilityResult<Row> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
 		if (configSnapshot instanceof RowSerializerConfigSnapshot) {
-			TypeSerializerConfigSnapshot[] fieldSerializerConfigSnapshots =
-				((RowSerializerConfigSnapshot) configSnapshot).getNestedSerializerConfigSnapshots();
+			List<Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> previousFieldSerializersAndConfigs =
+				((RowSerializerConfigSnapshot) configSnapshot).getNestedSerializersAndConfigs();
 
-			if (fieldSerializerConfigSnapshots.length == fieldSerializers.length) {
+			if (previousFieldSerializersAndConfigs.size() == fieldSerializers.length) {
 				boolean requireMigration = false;
 				TypeSerializer<?>[] convertDeserializers = new TypeSerializer<?>[fieldSerializers.length];
 
 				CompatibilityResult<?> compatResult;
-				for (int i = 0; i < fieldSerializers.length; i++) {
-					compatResult = fieldSerializers[i].ensureCompatibility(fieldSerializerConfigSnapshots[i]);
+				int i = 0;
+				for (Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot> f : previousFieldSerializersAndConfigs) {
+					compatResult = CompatibilityUtil.resolveCompatibilityResult(
+							f.f0,
+							UnloadableDummyTypeSerializer.class,
+							f.f1,
+							fieldSerializers[i]);
+
 					if (compatResult.isRequiresMigration()) {
 						requireMigration = true;
 
@@ -281,6 +290,8 @@ public final class RowSerializer extends TypeSerializer<Row> {
 								new TypeDeserializerAdapter<>(compatResult.getConvertDeserializer());
 						}
 					}
+
+					i++;
 				}
 
 				if (requireMigration) {
@@ -301,8 +312,8 @@ public final class RowSerializer extends TypeSerializer<Row> {
 		/** This empty nullary constructor is required for deserializing the configuration. */
 		public RowSerializerConfigSnapshot() {}
 
-		public RowSerializerConfigSnapshot(TypeSerializerConfigSnapshot[] fieldSerializerConfigSnapshots) {
-			super(fieldSerializerConfigSnapshots);
+		public RowSerializerConfigSnapshot(TypeSerializer[] fieldSerializers) {
+			super(fieldSerializers);
 		}
 
 		@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/a7bc5de9/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializer.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializer.java
index f485c3e..911c96f 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializer.java
@@ -156,4 +156,9 @@ public class TupleSerializer<T extends Tuple> extends TupleSerializerBase<T> {
 			throw new RuntimeException("Cannot instantiate tuple.", e);
 		}
 	}
+
+	@Override
+	protected TupleSerializerBase<T> createSerializerInstance(Class<T> tupleClass, TypeSerializer<?>[] fieldSerializers) {
+		return new TupleSerializer<>(tupleClass, fieldSerializers);
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a7bc5de9/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java
index 032c3f1..f12dcd9 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java
@@ -20,14 +20,18 @@ package org.apache.flink.api.java.typeutils.runtime;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.typeutils.CompatibilityResult;
+import org.apache.flink.api.common.typeutils.CompatibilityUtil;
+import org.apache.flink.api.common.typeutils.TypeDeserializerAdapter;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
-import org.apache.flink.api.common.typeutils.TypeSerializerUtil;
+import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 
 import java.io.IOException;
 import java.util.Arrays;
+import java.util.List;
 import java.util.Objects;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -125,9 +129,7 @@ public abstract class TupleSerializerBase<T> extends TypeSerializer<T> {
 
 	@Override
 	public TupleSerializerConfigSnapshot<T> snapshotConfiguration() {
-		return new TupleSerializerConfigSnapshot<>(
-				tupleClass,
-				TypeSerializerUtil.snapshotConfigurations(fieldSerializers));
+		return new TupleSerializerConfigSnapshot<>(tupleClass, fieldSerializers);
 	}
 
 	@SuppressWarnings("unchecked")
@@ -137,24 +139,48 @@ public abstract class TupleSerializerBase<T> extends TypeSerializer<T> {
 			final TupleSerializerConfigSnapshot<T> config = (TupleSerializerConfigSnapshot<T>) configSnapshot;
 
 			if (tupleClass.equals(config.getTupleClass())) {
-				TypeSerializerConfigSnapshot[] fieldSerializerConfigSnapshots =
-					((TupleSerializerConfigSnapshot) configSnapshot).getNestedSerializerConfigSnapshots();
+				List<Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> previousFieldSerializersAndConfigs =
+					((TupleSerializerConfigSnapshot) configSnapshot).getNestedSerializersAndConfigs();
+
+				if (previousFieldSerializersAndConfigs.size() == fieldSerializers.length) {
+
+					TypeSerializer<Object>[] convertFieldSerializers = new TypeSerializer[fieldSerializers.length];
+					boolean requiresMigration = false;
+					CompatibilityResult<Object> compatResult;
+					int i = 0;
+					for (Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot> f : previousFieldSerializersAndConfigs) {
+						compatResult = CompatibilityUtil.resolveCompatibilityResult(
+								f.f0,
+								UnloadableDummyTypeSerializer.class,
+								f.f1,
+								fieldSerializers[i]);
 
-				if (fieldSerializerConfigSnapshots.length == fieldSerializers.length) {
-
-					CompatibilityResult compatResult;
-					for (int i = 0; i < fieldSerializers.length; i++) {
-						compatResult = fieldSerializers[i].ensureCompatibility(fieldSerializerConfigSnapshots[i]);
 						if (compatResult.isRequiresMigration()) {
-							return CompatibilityResult.requiresMigration();
+							requiresMigration = true;
+
+							if (compatResult.getConvertDeserializer() != null) {
+								convertFieldSerializers[i] =
+									new TypeDeserializerAdapter<>(compatResult.getConvertDeserializer());
+							} else {
+								return CompatibilityResult.requiresMigration();
+							}
 						}
+
+						i++;
 					}
 
-					return CompatibilityResult.compatible();
+					if (!requiresMigration) {
+						return CompatibilityResult.compatible();
+					} else {
+						return CompatibilityResult.requiresMigration(
+							createSerializerInstance(tupleClass, convertFieldSerializers));
+					}
 				}
 			}
 		}
 
 		return CompatibilityResult.requiresMigration();
 	}
+
+	protected abstract TupleSerializerBase<T> createSerializerInstance(Class<T> tupleClass, TypeSerializer<?>[] fieldSerializers);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a7bc5de9/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerConfigSnapshot.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerConfigSnapshot.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerConfigSnapshot.java
index 6d2bb5f..1e7701c 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerConfigSnapshot.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerConfigSnapshot.java
@@ -20,7 +20,7 @@ package org.apache.flink.api.java.typeutils.runtime;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot;
-import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.util.InstantiationUtil;
@@ -41,11 +41,8 @@ public final class TupleSerializerConfigSnapshot<T> extends CompositeTypeSeriali
 	/** This empty nullary constructor is required for deserializing the configuration. */
 	public TupleSerializerConfigSnapshot() {}
 
-	public TupleSerializerConfigSnapshot(
-			Class<T> tupleClass,
-			TypeSerializerConfigSnapshot[] fieldSerializerConfigSnapshots) {
-
-		super(fieldSerializerConfigSnapshots);
+	public TupleSerializerConfigSnapshot(Class<T> tupleClass, TypeSerializer<?>[] fieldSerializers) {
+		super(fieldSerializers);
 
 		this.tupleClass = Preconditions.checkNotNull(tupleClass);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/a7bc5de9/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java
index 73c4379..57015c7 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java
@@ -109,14 +109,14 @@ public abstract class SerializerTestBase<T> extends TestLogger {
 
 		byte[] serializedConfig;
 		try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
-			TypeSerializerUtil.writeSerializerConfigSnapshot(
+			TypeSerializerSerializationUtil.writeSerializerConfigSnapshot(
 				new DataOutputViewStreamWrapper(out), configSnapshot);
 			serializedConfig = out.toByteArray();
 		}
 
 		TypeSerializerConfigSnapshot restoredConfig;
 		try (ByteArrayInputStream in = new ByteArrayInputStream(serializedConfig)) {
-			restoredConfig = TypeSerializerUtil.readSerializerConfigSnapshot(
+			restoredConfig = TypeSerializerSerializationUtil.readSerializerConfigSnapshot(
 				new DataInputViewStreamWrapper(in), Thread.currentThread().getContextClassLoader());
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/a7bc5de9/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerConfigSnapshotTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerConfigSnapshotTest.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerConfigSnapshotTest.java
deleted file mode 100644
index 0783bb6..0000000
--- a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerConfigSnapshotTest.java
+++ /dev/null
@@ -1,147 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.common.typeutils;
-
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataInputViewStreamWrapper;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
-import org.junit.Test;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.net.URL;
-import java.net.URLClassLoader;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-
-/**
- * Unit tests related to {@link TypeSerializerConfigSnapshot}.
- */
-public class TypeSerializerConfigSnapshotTest {
-
-	/**
-	 * Verifies that reading and writing configuration snapshots work correctly.
-	 */
-	@Test
-	public void testSerializeConfigurationSnapshots() throws Exception {
-		TestConfigSnapshot configSnapshot1 = new TestConfigSnapshot(1, "foo");
-		TestConfigSnapshot configSnapshot2 = new TestConfigSnapshot(2, "bar");
-
-		byte[] serializedConfig;
-		try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
-			TypeSerializerUtil.writeSerializerConfigSnapshots(
-				new DataOutputViewStreamWrapper(out),
-				configSnapshot1,
-				configSnapshot2);
-
-			serializedConfig = out.toByteArray();
-		}
-
-		TypeSerializerConfigSnapshot[] restoredConfigs;
-		try (ByteArrayInputStream in = new ByteArrayInputStream(serializedConfig)) {
-			restoredConfigs = TypeSerializerUtil.readSerializerConfigSnapshots(
-				new DataInputViewStreamWrapper(in), Thread.currentThread().getContextClassLoader());
-		}
-
-		assertEquals(2, restoredConfigs.length);
-		assertEquals(configSnapshot1, restoredConfigs[0]);
-		assertEquals(configSnapshot2, restoredConfigs[1]);
-	}
-
-	/**
-	 * Verifies that deserializing config snapshots fail if the config class could not be found.
-	 */
-	@Test
-	public void testFailsWhenConfigurationSnapshotClassNotFound() throws Exception {
-		byte[] serializedConfig;
-		try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
-			TypeSerializerUtil.writeSerializerConfigSnapshot(
-				new DataOutputViewStreamWrapper(out), new TestConfigSnapshot(123, "foobar"));
-			serializedConfig = out.toByteArray();
-		}
-
-		try (ByteArrayInputStream in = new ByteArrayInputStream(serializedConfig)) {
-			// read using a dummy classloader
-			TypeSerializerUtil.readSerializerConfigSnapshot(
-				new DataInputViewStreamWrapper(in), new URLClassLoader(new URL[0], null));
-			fail("Expected a ClassNotFoundException wrapped in IOException");
-		} catch (IOException expected) {
-			// test passes
-		}
-	}
-
-	public static class TestConfigSnapshot extends TypeSerializerConfigSnapshot {
-
-		static final int VERSION = 1;
-
-		private int val;
-		private String msg;
-
-		public TestConfigSnapshot() {}
-
-		public TestConfigSnapshot(int val, String msg) {
-			this.val = val;
-			this.msg = msg;
-		}
-
-		@Override
-		public void write(DataOutputView out) throws IOException {
-			super.write(out);
-			out.writeInt(val);
-			out.writeUTF(msg);
-		}
-
-		@Override
-		public void read(DataInputView in) throws IOException {
-			super.read(in);
-			val = in.readInt();
-			msg = in.readUTF();
-		}
-
-		@Override
-		public int getVersion() {
-			return VERSION;
-		}
-
-		@Override
-		public boolean equals(Object obj) {
-			if (obj == this) {
-				return true;
-			}
-
-			if (obj == null) {
-				return false;
-			}
-
-			if (obj instanceof TestConfigSnapshot) {
-				return val == ((TestConfigSnapshot) obj).val && msg.equals(((TestConfigSnapshot) obj).msg);
-			} else {
-				return false;
-			}
-		}
-
-		@Override
-		public int hashCode() {
-			return 31 * val + msg.hashCode();
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a7bc5de9/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationProxyTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationProxyTest.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationProxyTest.java
deleted file mode 100644
index db1b4ef..0000000
--- a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationProxyTest.java
+++ /dev/null
@@ -1,142 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.common.typeutils;
-
-import org.apache.flink.api.common.typeutils.base.IntSerializer;
-import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
-import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
-import org.apache.flink.core.memory.DataInputViewStreamWrapper;
-import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
-import org.apache.flink.util.InstantiationUtil;
-import org.junit.Assert;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.powermock.api.mockito.PowerMockito;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
-
-import java.io.IOException;
-import java.io.InvalidClassException;
-import java.net.URL;
-import java.net.URLClassLoader;
-
-import static org.junit.Assert.fail;
-import static org.mockito.Matchers.any;
-
-@RunWith(PowerMockRunner.class)
-@PrepareForTest(InstantiationUtil.class)
-public class TypeSerializerSerializationProxyTest {
-
-	@Test
-	public void testStateSerializerSerializationProxy() throws Exception {
-
-		TypeSerializer<?> serializer = IntSerializer.INSTANCE;
-
-		TypeSerializerSerializationProxy<?> proxy = new TypeSerializerSerializationProxy<>(serializer);
-
-		byte[] serialized;
-		try (ByteArrayOutputStreamWithPos out = new ByteArrayOutputStreamWithPos()) {
-			proxy.write(new DataOutputViewStreamWrapper(out));
-			serialized = out.toByteArray();
-		}
-
-		proxy = new TypeSerializerSerializationProxy<>(Thread.currentThread().getContextClassLoader());
-
-		try (ByteArrayInputStreamWithPos in = new ByteArrayInputStreamWithPos(serialized)) {
-			proxy.read(new DataInputViewStreamWrapper(in));
-		}
-
-		Assert.assertEquals(serializer, proxy.getTypeSerializer());
-	}
-
-	@Test
-	public void testStateSerializerSerializationProxyClassNotFound() throws Exception {
-
-		TypeSerializer<?> serializer = IntSerializer.INSTANCE;
-
-		TypeSerializerSerializationProxy<?> proxy = new TypeSerializerSerializationProxy<>(serializer);
-
-		byte[] serialized;
-		try (ByteArrayOutputStreamWithPos out = new ByteArrayOutputStreamWithPos()) {
-			proxy.write(new DataOutputViewStreamWrapper(out));
-			serialized = out.toByteArray();
-		}
-
-		proxy = new TypeSerializerSerializationProxy<>(new URLClassLoader(new URL[0], null));
-
-		try (ByteArrayInputStreamWithPos in = new ByteArrayInputStreamWithPos(serialized)) {
-			proxy.read(new DataInputViewStreamWrapper(in));
-			fail("ClassNotFoundException expected, leading to IOException");
-		} catch (IOException expected) {
-
-		}
-
-		proxy = new TypeSerializerSerializationProxy<>(new URLClassLoader(new URL[0], null), true);
-
-		try (ByteArrayInputStreamWithPos in = new ByteArrayInputStreamWithPos(serialized)) {
-			proxy.read(new DataInputViewStreamWrapper(in));
-		}
-
-		Assert.assertTrue(proxy.getTypeSerializer() instanceof TypeSerializerSerializationProxy.ClassNotFoundDummyTypeSerializer);
-
-		Assert.assertArrayEquals(
-				InstantiationUtil.serializeObject(serializer),
-				((TypeSerializerSerializationProxy.ClassNotFoundDummyTypeSerializer<?>) proxy.getTypeSerializer()).getActualBytes());
-	}
-
-	@Test
-	public void testStateSerializerSerializationProxyInvalidClass() throws Exception {
-
-		TypeSerializer<?> serializer = IntSerializer.INSTANCE;
-
-		TypeSerializerSerializationProxy<?> proxy = new TypeSerializerSerializationProxy<>(serializer);
-
-		byte[] serialized;
-		try (ByteArrayOutputStreamWithPos out = new ByteArrayOutputStreamWithPos()) {
-			proxy.write(new DataOutputViewStreamWrapper(out));
-			serialized = out.toByteArray();
-		}
-
-		PowerMockito.spy(InstantiationUtil.class);
-		PowerMockito
-			.doThrow(new InvalidClassException("test invalid class exception"))
-			.when(InstantiationUtil.class, "deserializeObject", any(byte[].class), any(ClassLoader.class));
-
-		proxy = new TypeSerializerSerializationProxy<>(new URLClassLoader(new URL[0], null));
-
-		try (ByteArrayInputStreamWithPos in = new ByteArrayInputStreamWithPos(serialized)) {
-			proxy.read(new DataInputViewStreamWrapper(in));
-			fail("InvalidClassException expected, leading to IOException");
-		} catch (IOException expected) {
-
-		}
-
-		proxy = new TypeSerializerSerializationProxy<>(new URLClassLoader(new URL[0], null), true);
-
-		try (ByteArrayInputStreamWithPos in = new ByteArrayInputStreamWithPos(serialized)) {
-			proxy.read(new DataInputViewStreamWrapper(in));
-		}
-
-		Assert.assertTrue(proxy.getTypeSerializer() instanceof TypeSerializerSerializationProxy.ClassNotFoundDummyTypeSerializer);
-
-		Assert.assertArrayEquals(
-			InstantiationUtil.serializeObject(serializer),
-			((TypeSerializerSerializationProxy.ClassNotFoundDummyTypeSerializer<?>) proxy.getTypeSerializer()).getActualBytes());
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a7bc5de9/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationUtilTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationUtilTest.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationUtilTest.java
new file mode 100644
index 0000000..738644b
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationUtilTest.java
@@ -0,0 +1,295 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils;
+
+import org.apache.flink.api.common.typeutils.base.DoubleSerializer;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
+import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.util.InstantiationUtil;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InvalidClassException;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+
+/**
+ * Unit tests for {@link TypeSerializerSerializationUtil}.
+ */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(TypeSerializerSerializationUtil.class)
+public class TypeSerializerSerializationUtilTest {
+
+	/**
+	 * Verifies that reading and writing serializers work correctly.
+	 */
+	@Test
+	public void testSerializerSerialization() throws Exception {
+
+		TypeSerializer<?> serializer = IntSerializer.INSTANCE;
+
+		byte[] serialized;
+		try (ByteArrayOutputStreamWithPos out = new ByteArrayOutputStreamWithPos()) {
+			TypeSerializerSerializationUtil.writeSerializer(new DataOutputViewStreamWrapper(out), serializer);
+			serialized = out.toByteArray();
+		}
+
+		TypeSerializer<?> deserializedSerializer;
+		try (ByteArrayInputStreamWithPos in = new ByteArrayInputStreamWithPos(serialized)) {
+			deserializedSerializer = TypeSerializerSerializationUtil.tryReadSerializer(
+				new DataInputViewStreamWrapper(in), Thread.currentThread().getContextClassLoader());
+		}
+
+		Assert.assertEquals(serializer, deserializedSerializer);
+	}
+
+	/**
+	 * Verifies deserialization failure cases when reading a serializer from bytes, in the
+	 * case of a {@link ClassNotFoundException}.
+	 */
+	@Test
+	public void testSerializerSerializationWithClassNotFound() throws Exception {
+
+		TypeSerializer<?> serializer = IntSerializer.INSTANCE;
+
+		byte[] serialized;
+		try (ByteArrayOutputStreamWithPos out = new ByteArrayOutputStreamWithPos()) {
+			TypeSerializerSerializationUtil.writeSerializer(new DataOutputViewStreamWrapper(out), serializer);
+			serialized = out.toByteArray();
+		}
+
+		TypeSerializer<?> deserializedSerializer;
+
+		try (ByteArrayInputStreamWithPos in = new ByteArrayInputStreamWithPos(serialized)) {
+			deserializedSerializer = TypeSerializerSerializationUtil.tryReadSerializer(
+				new DataInputViewStreamWrapper(in), new URLClassLoader(new URL[0], null));
+		}
+		Assert.assertEquals(null, deserializedSerializer);
+
+		try (ByteArrayInputStreamWithPos in = new ByteArrayInputStreamWithPos(serialized)) {
+			deserializedSerializer = TypeSerializerSerializationUtil.tryReadSerializer(
+				new DataInputViewStreamWrapper(in), new URLClassLoader(new URL[0], null), true);
+		}
+		Assert.assertTrue(deserializedSerializer instanceof UnloadableDummyTypeSerializer);
+
+		Assert.assertArrayEquals(
+				InstantiationUtil.serializeObject(serializer),
+				((UnloadableDummyTypeSerializer<?>) deserializedSerializer).getActualBytes());
+	}
+
+	/**
+	 * Verifies deserialization failure cases when reading a serializer from bytes, in the
+	 * case of a {@link InvalidClassException}.
+	 */
+	@Test
+	public void testSerializerSerializationWithInvalidClass() throws Exception {
+
+		TypeSerializer<?> serializer = IntSerializer.INSTANCE;
+
+		byte[] serialized;
+		try (ByteArrayOutputStreamWithPos out = new ByteArrayOutputStreamWithPos()) {
+			TypeSerializerSerializationUtil.writeSerializer(new DataOutputViewStreamWrapper(out), serializer);
+			serialized = out.toByteArray();
+		}
+
+		TypeSerializer<?> deserializedSerializer;
+
+		// mock failure when deserializing serializers
+		TypeSerializerSerializationUtil.TypeSerializerSerializationProxy<?> mockProxy =
+				mock(TypeSerializerSerializationUtil.TypeSerializerSerializationProxy.class);
+		doThrow(new IOException()).when(mockProxy).read(any(DataInputViewStreamWrapper.class));
+		PowerMockito.whenNew(TypeSerializerSerializationUtil.TypeSerializerSerializationProxy.class).withAnyArguments().thenReturn(mockProxy);
+
+		try (ByteArrayInputStreamWithPos in = new ByteArrayInputStreamWithPos(serialized)) {
+			deserializedSerializer = TypeSerializerSerializationUtil.tryReadSerializer(
+				new DataInputViewStreamWrapper(in), Thread.currentThread().getContextClassLoader());
+		}
+		Assert.assertEquals(null, deserializedSerializer);
+	}
+
+	/**
+	 * Verifies that reading and writing configuration snapshots work correctly.
+	 */
+	@Test
+	public void testSerializeConfigurationSnapshots() throws Exception {
+		TypeSerializerSerializationUtilTest.TestConfigSnapshot configSnapshot1 =
+			new TypeSerializerSerializationUtilTest.TestConfigSnapshot(1, "foo");
+
+		TypeSerializerSerializationUtilTest.TestConfigSnapshot configSnapshot2 =
+			new TypeSerializerSerializationUtilTest.TestConfigSnapshot(2, "bar");
+
+		byte[] serializedConfig;
+		try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
+			TypeSerializerSerializationUtil.writeSerializerConfigSnapshots(
+				new DataOutputViewStreamWrapper(out),
+				configSnapshot1,
+				configSnapshot2);
+
+			serializedConfig = out.toByteArray();
+		}
+
+		TypeSerializerConfigSnapshot[] restoredConfigs;
+		try (ByteArrayInputStream in = new ByteArrayInputStream(serializedConfig)) {
+			restoredConfigs = TypeSerializerSerializationUtil.readSerializerConfigSnapshots(
+				new DataInputViewStreamWrapper(in), Thread.currentThread().getContextClassLoader());
+		}
+
+		assertEquals(2, restoredConfigs.length);
+		assertEquals(configSnapshot1, restoredConfigs[0]);
+		assertEquals(configSnapshot2, restoredConfigs[1]);
+	}
+
+	/**
+	 * Verifies that deserializing config snapshots fail if the config class could not be found.
+	 */
+	@Test
+	public void testFailsWhenConfigurationSnapshotClassNotFound() throws Exception {
+		byte[] serializedConfig;
+		try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
+			TypeSerializerSerializationUtil.writeSerializerConfigSnapshot(
+				new DataOutputViewStreamWrapper(out), new TypeSerializerSerializationUtilTest.TestConfigSnapshot(123, "foobar"));
+			serializedConfig = out.toByteArray();
+		}
+
+		try (ByteArrayInputStream in = new ByteArrayInputStream(serializedConfig)) {
+			// read using a dummy classloader
+			TypeSerializerSerializationUtil.readSerializerConfigSnapshot(
+				new DataInputViewStreamWrapper(in), new URLClassLoader(new URL[0], null));
+			fail("Expected a ClassNotFoundException wrapped in IOException");
+		} catch (IOException expected) {
+			// test passes
+		}
+	}
+
+	/**
+	 * Verifies resilience to serializer deserialization failures when writing and reading
+	 * serializer and config snapshot pairs.
+	 */
+	@Test
+	public void testSerializerAndConfigPairsSerializationWithSerializerDeserializationFailures() throws Exception {
+		List<Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> serializersAndConfigs = Arrays.asList(
+			new Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>(
+				IntSerializer.INSTANCE, IntSerializer.INSTANCE.snapshotConfiguration()),
+			new Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>(
+				DoubleSerializer.INSTANCE, DoubleSerializer.INSTANCE.snapshotConfiguration()));
+
+		byte[] serializedSerializersAndConfigs;
+		try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
+			TypeSerializerSerializationUtil.writeSerializersAndConfigsWithResilience(
+					new DataOutputViewStreamWrapper(out), serializersAndConfigs);
+			serializedSerializersAndConfigs = out.toByteArray();
+		}
+
+		// mock failure when deserializing serializers
+		TypeSerializerSerializationUtil.TypeSerializerSerializationProxy<?> mockProxy =
+				mock(TypeSerializerSerializationUtil.TypeSerializerSerializationProxy.class);
+		doThrow(new IOException()).when(mockProxy).read(any(DataInputViewStreamWrapper.class));
+		PowerMockito.whenNew(TypeSerializerSerializationUtil.TypeSerializerSerializationProxy.class).withAnyArguments().thenReturn(mockProxy);
+
+		List<Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> restored;
+		try (ByteArrayInputStream in = new ByteArrayInputStream(serializedSerializersAndConfigs)) {
+			restored = TypeSerializerSerializationUtil.readSerializersAndConfigsWithResilience(
+				new DataInputViewStreamWrapper(in), Thread.currentThread().getContextClassLoader());
+		}
+
+		Assert.assertEquals(2, restored.size());
+		Assert.assertEquals(null, restored.get(0).f0);
+		Assert.assertEquals(IntSerializer.INSTANCE.snapshotConfiguration(), restored.get(0).f1);
+		Assert.assertEquals(null, restored.get(1).f0);
+		Assert.assertEquals(DoubleSerializer.INSTANCE.snapshotConfiguration(), restored.get(1).f1);
+	}
+
+	public static class TestConfigSnapshot extends TypeSerializerConfigSnapshot {
+
+		static final int VERSION = 1;
+
+		private int val;
+		private String msg;
+
+		public TestConfigSnapshot() {}
+
+		public TestConfigSnapshot(int val, String msg) {
+			this.val = val;
+			this.msg = msg;
+		}
+
+		@Override
+		public void write(DataOutputView out) throws IOException {
+			super.write(out);
+			out.writeInt(val);
+			out.writeUTF(msg);
+		}
+
+		@Override
+		public void read(DataInputView in) throws IOException {
+			super.read(in);
+			val = in.readInt();
+			msg = in.readUTF();
+		}
+
+		@Override
+		public int getVersion() {
+			return VERSION;
+		}
+
+		@Override
+		public boolean equals(Object obj) {
+			if (obj == this) {
+				return true;
+			}
+
+			if (obj == null) {
+				return false;
+			}
+
+			if (obj instanceof TypeSerializerSerializationUtilTest.TestConfigSnapshot) {
+				return val == ((TypeSerializerSerializationUtilTest.TestConfigSnapshot) obj).val
+					&& msg.equals(((TypeSerializerSerializationUtilTest.TestConfigSnapshot) obj).msg);
+			} else {
+				return false;
+			}
+		}
+
+		@Override
+		public int hashCode() {
+			return 31 * val + msg.hashCode();
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a7bc5de9/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/EnumSerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/EnumSerializerTest.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/EnumSerializerTest.java
index 16ea945..e3ce3ee 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/EnumSerializerTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/EnumSerializerTest.java
@@ -21,7 +21,7 @@ package org.apache.flink.api.common.typeutils.base;
 import org.apache.flink.api.common.typeutils.CompatibilityResult;
 import org.apache.flink.api.common.typeutils.SerializerTestInstance;
 import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
-import org.apache.flink.api.common.typeutils.TypeSerializerUtil;
+import org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.util.InstantiationUtil;
@@ -95,14 +95,14 @@ public class EnumSerializerTest extends TestLogger {
 
 		byte[] serializedConfig;
 		try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
-			TypeSerializerUtil.writeSerializerConfigSnapshot(
+			TypeSerializerSerializationUtil.writeSerializerConfigSnapshot(
 				new DataOutputViewStreamWrapper(out), serializer.snapshotConfiguration());
 			serializedConfig = out.toByteArray();
 		}
 
 		TypeSerializerConfigSnapshot restoredConfig;
 		try (ByteArrayInputStream in = new ByteArrayInputStream(serializedConfig)) {
-			restoredConfig = TypeSerializerUtil.readSerializerConfigSnapshot(
+			restoredConfig = TypeSerializerSerializationUtil.readSerializerConfigSnapshot(
 				new DataInputViewStreamWrapper(in), Thread.currentThread().getContextClassLoader());
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/a7bc5de9/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerTest.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerTest.java
index c77ffcc..10f4708 100644
--- a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerTest.java
@@ -20,12 +20,14 @@ package org.apache.flink.api.java.typeutils.runtime;
 
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
+import java.io.IOException;
 import java.lang.reflect.Field;
 import java.util.ArrayList;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Objects;
 import java.util.Random;
 
@@ -39,8 +41,9 @@ import org.apache.flink.api.common.typeutils.CompositeType.FlatFieldDescriptor;
 import org.apache.flink.api.common.operators.Keys.ExpressionKeys;
 import org.apache.flink.api.common.operators.Keys.IncompatibleKeysException;
 import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
-import org.apache.flink.api.common.typeutils.TypeSerializerUtil;
+import org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil;
 import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.api.java.typeutils.PojoTypeInfo;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
@@ -50,14 +53,23 @@ import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.junit.Assert;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
 
 /**
  * A test for the {@link PojoSerializer}.
  */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(TypeSerializerSerializationUtil.class)
 public class PojoSerializerTest extends SerializerTestBase<PojoSerializerTest.TestUserClass> {
 	private TypeInformation<TestUserClass> type = TypeExtractor.getForClass(TestUserClass.class);
 
@@ -286,7 +298,7 @@ public class PojoSerializerTest extends SerializerTestBase<PojoSerializerTest.Te
 		TypeSerializerConfigSnapshot pojoSerializerConfigSnapshot = pojoSerializer1.snapshotConfiguration();
 		byte[] serializedConfig;
 		try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
-			TypeSerializerUtil.writeSerializerConfigSnapshot(new DataOutputViewStreamWrapper(out), pojoSerializerConfigSnapshot);
+			TypeSerializerSerializationUtil.writeSerializerConfigSnapshot(new DataOutputViewStreamWrapper(out), pojoSerializerConfigSnapshot);
 			serializedConfig = out.toByteArray();
 		}
 
@@ -295,7 +307,7 @@ public class PojoSerializerTest extends SerializerTestBase<PojoSerializerTest.Te
 
 		// read configuration again from bytes
 		try(ByteArrayInputStream in = new ByteArrayInputStream(serializedConfig)) {
-			pojoSerializerConfigSnapshot = TypeSerializerUtil.readSerializerConfigSnapshot(
+			pojoSerializerConfigSnapshot = TypeSerializerSerializationUtil.readSerializerConfigSnapshot(
 				new DataInputViewStreamWrapper(in), Thread.currentThread().getContextClassLoader());
 		}
 
@@ -322,7 +334,7 @@ public class PojoSerializerTest extends SerializerTestBase<PojoSerializerTest.Te
 		TypeSerializerConfigSnapshot pojoSerializerConfigSnapshot = pojoSerializer.snapshotConfiguration();
 		byte[] serializedConfig;
 		try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
-			TypeSerializerUtil.writeSerializerConfigSnapshot(new DataOutputViewStreamWrapper(out), pojoSerializerConfigSnapshot);
+			TypeSerializerSerializationUtil.writeSerializerConfigSnapshot(new DataOutputViewStreamWrapper(out), pojoSerializerConfigSnapshot);
 			serializedConfig = out.toByteArray();
 		}
 
@@ -335,7 +347,7 @@ public class PojoSerializerTest extends SerializerTestBase<PojoSerializerTest.Te
 
 		// read configuration from bytes
 		try(ByteArrayInputStream in = new ByteArrayInputStream(serializedConfig)) {
-			pojoSerializerConfigSnapshot = TypeSerializerUtil.readSerializerConfigSnapshot(
+			pojoSerializerConfigSnapshot = TypeSerializerSerializationUtil.readSerializerConfigSnapshot(
 				new DataInputViewStreamWrapper(in), Thread.currentThread().getContextClassLoader());
 		}
 
@@ -368,7 +380,7 @@ public class PojoSerializerTest extends SerializerTestBase<PojoSerializerTest.Te
 		TypeSerializerConfigSnapshot pojoSerializerConfigSnapshot = pojoSerializer.snapshotConfiguration();
 		byte[] serializedConfig;
 		try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
-			TypeSerializerUtil.writeSerializerConfigSnapshot(new DataOutputViewStreamWrapper(out), pojoSerializerConfigSnapshot);
+			TypeSerializerSerializationUtil.writeSerializerConfigSnapshot(new DataOutputViewStreamWrapper(out), pojoSerializerConfigSnapshot);
 			serializedConfig = out.toByteArray();
 		}
 
@@ -378,7 +390,7 @@ public class PojoSerializerTest extends SerializerTestBase<PojoSerializerTest.Te
 
 		// read configuration from bytes
 		try(ByteArrayInputStream in = new ByteArrayInputStream(serializedConfig)) {
-			pojoSerializerConfigSnapshot = TypeSerializerUtil.readSerializerConfigSnapshot(
+			pojoSerializerConfigSnapshot = TypeSerializerSerializationUtil.readSerializerConfigSnapshot(
 				new DataInputViewStreamWrapper(in), Thread.currentThread().getContextClassLoader());
 		}
 
@@ -426,7 +438,7 @@ public class PojoSerializerTest extends SerializerTestBase<PojoSerializerTest.Te
 		TypeSerializerConfigSnapshot pojoSerializerConfigSnapshot = pojoSerializer.snapshotConfiguration();
 		byte[] serializedConfig;
 		try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
-			TypeSerializerUtil.writeSerializerConfigSnapshot(new DataOutputViewStreamWrapper(out), pojoSerializerConfigSnapshot);
+			TypeSerializerSerializationUtil.writeSerializerConfigSnapshot(new DataOutputViewStreamWrapper(out), pojoSerializerConfigSnapshot);
 			serializedConfig = out.toByteArray();
 		}
 
@@ -438,7 +450,7 @@ public class PojoSerializerTest extends SerializerTestBase<PojoSerializerTest.Te
 
 		// read configuration from bytes
 		try(ByteArrayInputStream in = new ByteArrayInputStream(serializedConfig)) {
-			pojoSerializerConfigSnapshot = TypeSerializerUtil.readSerializerConfigSnapshot(
+			pojoSerializerConfigSnapshot = TypeSerializerSerializationUtil.readSerializerConfigSnapshot(
 				new DataInputViewStreamWrapper(in), Thread.currentThread().getContextClassLoader());
 		}
 
@@ -472,14 +484,38 @@ public class PojoSerializerTest extends SerializerTestBase<PojoSerializerTest.Te
 		// creating this serializer just for generating config snapshots of the field serializers
 		PojoSerializer<TestUserClass> ser = (PojoSerializer<TestUserClass>) type.createSerializer(new ExecutionConfig());
 
-		LinkedHashMap<Field, TypeSerializerConfigSnapshot> mockOriginalFieldToSerializerConfigSnapshot =
+		LinkedHashMap<Field, Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> mockOriginalFieldToSerializerConfigSnapshot =
 			new LinkedHashMap<>(mockOriginalFieldOrder.length);
-		mockOriginalFieldToSerializerConfigSnapshot.put(mockOriginalFieldOrder[0], ser.getFieldSerializers()[3].snapshotConfiguration());
-		mockOriginalFieldToSerializerConfigSnapshot.put(mockOriginalFieldOrder[1], ser.getFieldSerializers()[2].snapshotConfiguration());
-		mockOriginalFieldToSerializerConfigSnapshot.put(mockOriginalFieldOrder[2], ser.getFieldSerializers()[5].snapshotConfiguration());
-		mockOriginalFieldToSerializerConfigSnapshot.put(mockOriginalFieldOrder[3], ser.getFieldSerializers()[0].snapshotConfiguration());
-		mockOriginalFieldToSerializerConfigSnapshot.put(mockOriginalFieldOrder[4], ser.getFieldSerializers()[1].snapshotConfiguration());
-		mockOriginalFieldToSerializerConfigSnapshot.put(mockOriginalFieldOrder[5], ser.getFieldSerializers()[4].snapshotConfiguration());
+		mockOriginalFieldToSerializerConfigSnapshot.put(
+			mockOriginalFieldOrder[0],
+			new Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>(
+				ser.getFieldSerializers()[3],
+				ser.getFieldSerializers()[3].snapshotConfiguration()));
+		mockOriginalFieldToSerializerConfigSnapshot.put(
+			mockOriginalFieldOrder[1],
+			new Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>(
+				ser.getFieldSerializers()[2],
+				ser.getFieldSerializers()[2].snapshotConfiguration()));
+		mockOriginalFieldToSerializerConfigSnapshot.put(
+			mockOriginalFieldOrder[2],
+			new Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>(
+				ser.getFieldSerializers()[5],
+				ser.getFieldSerializers()[5].snapshotConfiguration()));
+		mockOriginalFieldToSerializerConfigSnapshot.put(
+			mockOriginalFieldOrder[3],
+			new Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>(
+				ser.getFieldSerializers()[0],
+				ser.getFieldSerializers()[0].snapshotConfiguration()));
+		mockOriginalFieldToSerializerConfigSnapshot.put(
+			mockOriginalFieldOrder[4],
+			new Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>(
+				ser.getFieldSerializers()[1],
+				ser.getFieldSerializers()[1].snapshotConfiguration()));
+		mockOriginalFieldToSerializerConfigSnapshot.put(
+			mockOriginalFieldOrder[5],
+			new Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>(
+				ser.getFieldSerializers()[4],
+				ser.getFieldSerializers()[4].snapshotConfiguration()));
 
 		PojoSerializer<TestUserClass> pojoSerializer = (PojoSerializer<TestUserClass>) type.createSerializer(new ExecutionConfig());
 
@@ -494,13 +530,11 @@ public class PojoSerializerTest extends SerializerTestBase<PojoSerializerTest.Te
 			new PojoSerializer.PojoSerializerConfigSnapshot<>(
 				TestUserClass.class,
 				mockOriginalFieldToSerializerConfigSnapshot, // this mocks the previous field order
-				new LinkedHashMap<Class<?>, TypeSerializerConfigSnapshot>(), // empty; irrelevant for this test
-				new HashMap<Class<?>, TypeSerializerConfigSnapshot>()); // empty; irrelevant for this test
+				new LinkedHashMap<Class<?>, Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>>(), // empty; irrelevant for this test
+				new HashMap<Class<?>, Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>>()); // empty; irrelevant for this test
 
 		// reconfigure - check reconfiguration result and that fields are reordered to the previous order
-		CompatibilityResult<TestUserClass> compatResult = pojoSerializer.ensureCompatibility(
-
-			mockPreviousConfigSnapshot);
+		CompatibilityResult<TestUserClass> compatResult = pojoSerializer.ensureCompatibility(mockPreviousConfigSnapshot);
 		assertFalse(compatResult.isRequiresMigration());
 		int i = 0;
 		for (Field field : mockOriginalFieldOrder) {
@@ -508,4 +542,74 @@ public class PojoSerializerTest extends SerializerTestBase<PojoSerializerTest.Te
 			i++;
 		}
 	}
+
+	@SuppressWarnings("unchecked")
+	@Test
+	public void testSerializerSerializationFailureResilience() throws Exception{
+		PojoSerializer<TestUserClass> pojoSerializer = (PojoSerializer<TestUserClass>) type.createSerializer(new ExecutionConfig());
+
+		// snapshot configuration and serialize to bytes
+		PojoSerializer.PojoSerializerConfigSnapshot<TestUserClass> config = pojoSerializer.snapshotConfiguration();
+		byte[] serializedConfig;
+		try (
+			ByteArrayOutputStream out = new ByteArrayOutputStream()) {
+			TypeSerializerSerializationUtil.writeSerializerConfigSnapshot(new DataOutputViewStreamWrapper(out), config);
+			serializedConfig = out.toByteArray();
+		}
+
+		// mock failure when deserializing serializers
+		TypeSerializerSerializationUtil.TypeSerializerSerializationProxy<?> mockProxy =
+			mock(TypeSerializerSerializationUtil.TypeSerializerSerializationProxy.class);
+		doThrow(new IOException()).when(mockProxy).read(any(DataInputViewStreamWrapper.class));
+		PowerMockito.whenNew(TypeSerializerSerializationUtil.TypeSerializerSerializationProxy.class).withAnyArguments().thenReturn(mockProxy);
+
+		// read configuration from bytes
+		PojoSerializer.PojoSerializerConfigSnapshot<TestUserClass> deserializedConfig;
+		try(ByteArrayInputStream in = new ByteArrayInputStream(serializedConfig)) {
+			deserializedConfig = (PojoSerializer.PojoSerializerConfigSnapshot<TestUserClass>)
+				TypeSerializerSerializationUtil.readSerializerConfigSnapshot(
+					new DataInputViewStreamWrapper(in), Thread.currentThread().getContextClassLoader());
+		}
+
+		Assert.assertFalse(pojoSerializer.ensureCompatibility(deserializedConfig).isRequiresMigration());
+		verifyPojoSerializerConfigSnapshotWithSerializerSerializationFailure(config, deserializedConfig);
+	}
+
+	private static void verifyPojoSerializerConfigSnapshotWithSerializerSerializationFailure(
+			PojoSerializer.PojoSerializerConfigSnapshot<?> original,
+			PojoSerializer.PojoSerializerConfigSnapshot<?> deserializedConfig) {
+
+		LinkedHashMap<Field, Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> originalFieldSerializersAndConfs =
+				original.getFieldToSerializerConfigSnapshot();
+		for (Map.Entry<Field, Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> entry
+				: deserializedConfig.getFieldToSerializerConfigSnapshot().entrySet()) {
+
+			Assert.assertEquals(null, entry.getValue().f0);
+
+			if (entry.getValue().f1 instanceof PojoSerializer.PojoSerializerConfigSnapshot) {
+				verifyPojoSerializerConfigSnapshotWithSerializerSerializationFailure(
+					(PojoSerializer.PojoSerializerConfigSnapshot<?>) originalFieldSerializersAndConfs.get(entry.getKey()).f1,
+					(PojoSerializer.PojoSerializerConfigSnapshot<?>) entry.getValue().f1);
+			} else {
+				Assert.assertEquals(originalFieldSerializersAndConfs.get(entry.getKey()).f1, entry.getValue().f1);
+			}
+		}
+
+		LinkedHashMap<Class<?>, Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> originalRegistrations =
+				original.getRegisteredSubclassesToSerializerConfigSnapshots();
+
+		for (Map.Entry<Class<?>, Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> entry
+				: deserializedConfig.getRegisteredSubclassesToSerializerConfigSnapshots().entrySet()) {
+
+			Assert.assertEquals(null, entry.getValue().f0);
+
+			if (entry.getValue().f1 instanceof PojoSerializer.PojoSerializerConfigSnapshot) {
+				verifyPojoSerializerConfigSnapshotWithSerializerSerializationFailure(
+					(PojoSerializer.PojoSerializerConfigSnapshot<?>) originalRegistrations.get(entry.getKey()).f1,
+					(PojoSerializer.PojoSerializerConfigSnapshot<?>) entry.getValue().f1);
+			} else {
+				Assert.assertEquals(originalRegistrations.get(entry.getKey()).f1, entry.getValue().f1);
+			}
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a7bc5de9/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializerCompatibilityTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializerCompatibilityTest.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializerCompatibilityTest.java
index 860c560..5a404bd 100644
--- a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializerCompatibilityTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializerCompatibilityTest.java
@@ -25,7 +25,7 @@ import com.esotericsoftware.kryo.io.Output;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeutils.CompatibilityResult;
 import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
-import org.apache.flink.api.common.typeutils.TypeSerializerUtil;
+import org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.junit.Test;
@@ -53,7 +53,7 @@ public class KryoSerializerCompatibilityTest {
 		TypeSerializerConfigSnapshot kryoSerializerConfigSnapshot = kryoSerializerForA.snapshotConfiguration();
 		byte[] serializedConfig;
 		try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
-			TypeSerializerUtil.writeSerializerConfigSnapshot(new DataOutputViewStreamWrapper(out), kryoSerializerConfigSnapshot);
+			TypeSerializerSerializationUtil.writeSerializerConfigSnapshot(new DataOutputViewStreamWrapper(out), kryoSerializerConfigSnapshot);
 			serializedConfig = out.toByteArray();
 		}
 
@@ -61,7 +61,7 @@ public class KryoSerializerCompatibilityTest {
 
 		// read configuration again from bytes
 		try(ByteArrayInputStream in = new ByteArrayInputStream(serializedConfig)) {
-			kryoSerializerConfigSnapshot = TypeSerializerUtil.readSerializerConfigSnapshot(
+			kryoSerializerConfigSnapshot = TypeSerializerSerializationUtil.readSerializerConfigSnapshot(
 				new DataInputViewStreamWrapper(in), Thread.currentThread().getContextClassLoader());
 		}
 
@@ -91,7 +91,7 @@ public class KryoSerializerCompatibilityTest {
 		TypeSerializerConfigSnapshot kryoSerializerConfigSnapshot = kryoSerializer.snapshotConfiguration();
 		byte[] serializedConfig;
 		try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
-			TypeSerializerUtil.writeSerializerConfigSnapshot(new DataOutputViewStreamWrapper(out), kryoSerializerConfigSnapshot);
+			TypeSerializerSerializationUtil.writeSerializerConfigSnapshot(new DataOutputViewStreamWrapper(out), kryoSerializerConfigSnapshot);
 			serializedConfig = out.toByteArray();
 		}
 
@@ -104,7 +104,7 @@ public class KryoSerializerCompatibilityTest {
 
 		// read configuration from bytes
 		try(ByteArrayInputStream in = new ByteArrayInputStream(serializedConfig)) {
-			kryoSerializerConfigSnapshot = TypeSerializerUtil.readSerializerConfigSnapshot(
+			kryoSerializerConfigSnapshot = TypeSerializerSerializationUtil.readSerializerConfigSnapshot(
 				new DataInputViewStreamWrapper(in), Thread.currentThread().getContextClassLoader());
 		}