You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2017/05/07 19:47:03 UTC

[5/8] flink git commit: [FLINK-6178] [core] Allow serializer upgrades for managed state

http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
index 1a9c8f9..08da49e 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
@@ -23,22 +23,25 @@ import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
 import java.lang.reflect.Field;
 import java.lang.reflect.Modifier;
-import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.LinkedHashSet;
-import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.CompatibilityResult;
+import org.apache.flink.api.common.typeutils.GenericTypeSerializerConfigSnapshot;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializerUtil;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.util.Preconditions;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -53,22 +56,55 @@ public final class PojoSerializer<T> extends TypeSerializer<T> {
 
 	private static final long serialVersionUID = 1L;
 
-	private final Class<T> clazz;
+	// --------------------------------------------------------------------------------------------
+	// PojoSerializer parameters
+	// --------------------------------------------------------------------------------------------
 
-	private final TypeSerializer<Object>[] fieldSerializers;
+	/** The POJO type class. */
+	private final Class<T> clazz;
 
+	/**
+	 * Fields of the POJO and their serializers.
+	 *
+	 * <p>The fields are kept as a separate transient member, with their serialization
+	 * handled with the {@link #readObject(ObjectInputStream)} and {@link #writeObject(ObjectOutputStream)}
+	 * methods.
+	 *
+	 * <p>These may be reconfigured in {@link #ensureCompatibility(TypeSerializerConfigSnapshot)}.
+	 */
+	private transient Field[] fields;
+	private TypeSerializer<Object>[] fieldSerializers;
 	private final int numFields;
 
-	private final Map<Class<?>, Integer> registeredClasses;
-
-	private final TypeSerializer<?>[] registeredSerializers;
-
+	/**
+	 * Registered subclasses and their serializers.
+	 * Each subclass to their registered class tag is maintained as a separate map ordered by the class tag.
+	 *
+	 * <p>These may be reconfigured in {@link #ensureCompatibility(TypeSerializerConfigSnapshot)}.
+	 */
+	private LinkedHashMap<Class<?>, Integer> registeredClasses;
+	private TypeSerializer<?>[] registeredSerializers;
+
+	/**
+	 * Cache of non-registered subclasses to their serializers, created on-the-fly.
+	 *
+	 * <p>This cache is persisted and will be repopulated with reconfigured serializers
+	 * in {@link #ensureCompatibility(TypeSerializerConfigSnapshot)}.
+	 */
+	private transient HashMap<Class<?>, TypeSerializer<?>> subclassSerializerCache;
+
+	// --------------------------------------------------------------------------------------------
+
+	/**
+	 * Configuration of the current execution.
+	 *
+	 * <p>Nested serializers created using this will have the most up-to-date configuration,
+	 * and can be resolved for backwards compatibility with previous configuration
+	 * snapshots in {@link #ensureCompatibility(TypeSerializerConfigSnapshot)}.
+	 */
 	private final ExecutionConfig executionConfig;
 
-	private transient Map<Class<?>, TypeSerializer<?>> subclassSerializerCache;
 	private transient ClassLoader cl;
-	// We need to handle these ourselves in writeObject()/readObject()
-	private transient Field[] fields;
 
 	@SuppressWarnings("unchecked")
 	public PojoSerializer(
@@ -83,93 +119,20 @@ public final class PojoSerializer<T> extends TypeSerializer<T> {
 		this.numFields = fieldSerializers.length;
 		this.executionConfig = checkNotNull(executionConfig);
 
-		LinkedHashSet<Class<?>> registeredPojoTypes = executionConfig.getRegisteredPojoTypes();
-
 		for (int i = 0; i < numFields; i++) {
 			this.fields[i].setAccessible(true);
 		}
 
 		cl = Thread.currentThread().getContextClassLoader();
 
-		subclassSerializerCache = new HashMap<Class<?>, TypeSerializer<?>>();
-
 		// We only want those classes that are not our own class and are actually sub-classes.
-		List<Class<?>> cleanedTaggedClasses = new ArrayList<Class<?>>(registeredPojoTypes.size());
-		for (Class<?> registeredClass: registeredPojoTypes) {
-			if (registeredClass.equals(clazz)) {
-				continue;
-			}
-			if (!clazz.isAssignableFrom(registeredClass)) {
-				continue;
-			}
-			cleanedTaggedClasses.add(registeredClass);
-
-		}
-		this.registeredClasses = new LinkedHashMap<Class<?>, Integer>(cleanedTaggedClasses.size());
-		registeredSerializers = new TypeSerializer[cleanedTaggedClasses.size()];
+		LinkedHashSet<Class<?>> registeredSubclasses =
+				getRegisteredSubclassesFromExecutionConfig(clazz, executionConfig);
 
-		int id = 0;
-		for (Class<?> registeredClass: cleanedTaggedClasses) {
-			this.registeredClasses.put(registeredClass, id);
-			TypeInformation<?> typeInfo = TypeExtractor.createTypeInfo(registeredClass);
-			registeredSerializers[id] = typeInfo.createSerializer(executionConfig);
+		this.registeredClasses = createRegisteredSubclassTags(registeredSubclasses);
+		this.registeredSerializers = createRegisteredSubclassSerializers(registeredSubclasses, executionConfig);
 
-			id++;
-		}
-	}
-
-	private void writeObject(ObjectOutputStream out)
-			throws IOException, ClassNotFoundException {
-		out.defaultWriteObject();
-		out.writeInt(fields.length);
-		for (Field field: fields) {
-			FieldSerializer.serializeField(field, out);
-		}
-	}
-
-	private void readObject(ObjectInputStream in)
-			throws IOException, ClassNotFoundException {
-		in.defaultReadObject();
-		int numFields = in.readInt();
-		fields = new Field[numFields];
-		for (int i = 0; i < numFields; i++) {
-			fields[i] = FieldSerializer.deserializeField(in);
-		}
-
-		cl = Thread.currentThread().getContextClassLoader();
-		subclassSerializerCache = new HashMap<Class<?>, TypeSerializer<?>>();
-	}
-
-	private TypeSerializer<?> getSubclassSerializer(Class<?> subclass) {
-		TypeSerializer<?> result = subclassSerializerCache.get(subclass);
-		if (result == null) {
-
-			TypeInformation<?> typeInfo = TypeExtractor.createTypeInfo(subclass);
-			result = typeInfo.createSerializer(executionConfig);
-			if (result instanceof PojoSerializer) {
-				PojoSerializer<?> subclassSerializer = (PojoSerializer<?>) result;
-				subclassSerializer.copyBaseFieldOrder(this);
-			}
-			subclassSerializerCache.put(subclass, result);
-
-		}
-		return result;
-	}
-
-	@SuppressWarnings("unused")
-	private boolean hasField(Field f) {
-		for (Field field: fields) {
-			if (f.equals(field)) {
-				return true;
-			}
-		}
-		return false;
-	}
-
-	private void copyBaseFieldOrder(PojoSerializer<?> baseSerializer) {
-		// do nothing for now, but in the future, adapt subclass serializer to have same
-		// ordering as base class serializer so that binary comparison on base class fields
-		// can work
+		this.subclassSerializerCache = new HashMap<>();
 	}
 	
 	@Override
@@ -296,7 +259,7 @@ public final class PojoSerializer<T> extends TypeSerializer<T> {
 					}
 				}
 			} catch (IllegalAccessException e) {
-				throw new RuntimeException("Error during POJO copy, this should not happen since we check the fields" + "before.");
+				throw new RuntimeException("Error during POJO copy, this should not happen since we check the fields before.", e);
 			}
 		} else {
 			TypeSerializer subclassSerializer = getSubclassSerializer(actualType);
@@ -341,13 +304,15 @@ public final class PojoSerializer<T> extends TypeSerializer<T> {
 
 		target.writeByte(flags);
 
+		// if its a registered subclass, write the class tag id, otherwise write the full classname
 		if ((flags & IS_SUBCLASS) != 0) {
 			target.writeUTF(actualClass.getName());
 		} else if ((flags & IS_TAGGED_SUBCLASS) != 0) {
 			target.writeByte(subclassTag);
 		}
 
-
+		// if its a subclass, use the corresponding subclass serializer,
+		// otherwise serialize each field with our field serializers
 		if ((flags & NO_SUBCLASS) != 0) {
 			try {
 				for (int i = 0; i < numFields; i++) {
@@ -360,8 +325,7 @@ public final class PojoSerializer<T> extends TypeSerializer<T> {
 					}
 				}
 			} catch (IllegalAccessException e) {
-				throw new RuntimeException("Error during POJO copy, this should not happen since we check the fields" + "before.");
-
+				throw new RuntimeException("Error during POJO copy, this should not happen since we check the fields before.", e);
 			}
 		} else {
 			// subclass
@@ -418,8 +382,7 @@ public final class PojoSerializer<T> extends TypeSerializer<T> {
 					}
 				}
 			} catch (IllegalAccessException e) {
-				throw new RuntimeException("Error during POJO copy, this should not happen since we check the fields" + "before.");
-
+				throw new RuntimeException("Error during POJO copy, this should not happen since we check the fields before.", e);
 			}
 		} else {
 			if (subclassSerializer != null) {
@@ -493,8 +456,7 @@ public final class PojoSerializer<T> extends TypeSerializer<T> {
 					}
 				}
 			} catch (IllegalAccessException e) {
-				throw new RuntimeException(
-						"Error during POJO copy, this should not happen since we check the fields before.");
+				throw new RuntimeException("Error during POJO copy, this should not happen since we check the fields before.", e);
 			}
 		} else {
 			if (subclassSerializer != null) {
@@ -574,4 +536,527 @@ public final class PojoSerializer<T> extends TypeSerializer<T> {
 	public boolean canEqual(Object obj) {
 		return obj instanceof PojoSerializer;
 	}
+
+	// --------------------------------------------------------------------------------------------
+	// Serializer configuration snapshotting & compatibility
+	// --------------------------------------------------------------------------------------------
+
+	@Override
+	public PojoSerializerConfigSnapshot<T> snapshotConfiguration() {
+		return buildConfigSnapshot(
+				clazz,
+				registeredClasses,
+				registeredSerializers,
+				fields,
+				fieldSerializers,
+				subclassSerializerCache);
+	}
+
+	@SuppressWarnings("unchecked")
+	@Override
+	public CompatibilityResult<T> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
+		if (configSnapshot instanceof PojoSerializerConfigSnapshot) {
+			final PojoSerializerConfigSnapshot<T> config = (PojoSerializerConfigSnapshot<T>) configSnapshot;
+
+			if (clazz.equals(config.getTypeClass())) {
+				if (this.numFields == config.getFieldToSerializerConfigSnapshot().size()) {
+
+					CompatibilityResult<?> compatResult;
+
+					// ----------- check field order and compatibility of field serializers -----------
+
+					// reordered fields and their serializers;
+					// this won't be applied to this serializer until all compatibility checks have been completed
+					final Field[] reorderedFields = new Field[this.numFields];
+					final TypeSerializer<Object>[] reorderedFieldSerializers =
+						(TypeSerializer<Object>[]) new TypeSerializer<?>[this.numFields];
+
+					int i = 0;
+					for (Map.Entry<Field, TypeSerializerConfigSnapshot> fieldToConfigSnapshotEntry
+							: config.getFieldToSerializerConfigSnapshot().entrySet()) {
+
+						int fieldIndex = findField(fieldToConfigSnapshotEntry.getKey());
+						if (fieldIndex != -1) {
+							reorderedFields[i] = fieldToConfigSnapshotEntry.getKey();
+
+							compatResult = fieldSerializers[fieldIndex].ensureCompatibility(fieldToConfigSnapshotEntry.getValue());
+							if (compatResult.requiresMigration()) {
+								return CompatibilityResult.requiresMigration(null);
+							} else {
+								reorderedFieldSerializers[i] = fieldSerializers[fieldIndex];
+							}
+						} else {
+							return CompatibilityResult.requiresMigration(null);
+						}
+
+						i++;
+					}
+
+					// ---- check subclass registration order and compatibility of registered serializers ----
+
+					// reordered subclass registrations and their serializers;
+					// this won't be applied to this serializer until all compatibility checks have been completed
+					final LinkedHashMap<Class<?>, Integer> reorderedRegisteredSubclassesToClasstags;
+					final TypeSerializer<?>[] reorderedRegisteredSubclassSerializers;
+
+					final LinkedHashMap<Class<?>, TypeSerializerConfigSnapshot> previousRegistrations =
+						config.getRegisteredSubclassesToSerializerConfigSnapshots();
+
+					// the reconfigured list of registered subclasses will be the previous registered
+					// subclasses in the original order with new subclasses appended at the end
+					LinkedHashSet<Class<?>> reorderedRegisteredSubclasses = new LinkedHashSet<>();
+					reorderedRegisteredSubclasses.addAll(previousRegistrations.keySet());
+					reorderedRegisteredSubclasses.addAll(
+						getRegisteredSubclassesFromExecutionConfig(clazz, executionConfig));
+
+					// re-establish the registered class tags and serializers
+					reorderedRegisteredSubclassesToClasstags = createRegisteredSubclassTags(reorderedRegisteredSubclasses);
+					reorderedRegisteredSubclassSerializers = createRegisteredSubclassSerializers(
+						reorderedRegisteredSubclasses, executionConfig);
+
+					i = 0;
+					for (TypeSerializerConfigSnapshot previousRegisteredSerializerConfig : previousRegistrations.values()) {
+						// check compatibility of subclass serializer
+						compatResult = reorderedRegisteredSubclassSerializers[i].ensureCompatibility(previousRegisteredSerializerConfig);
+						if (compatResult.requiresMigration()) {
+							return CompatibilityResult.requiresMigration(null);
+						}
+
+						i++;
+					}
+
+					// ------------------ ensure compatibility of non-registered subclass serializers ------------------
+
+					// the rebuilt cache for non-registered subclass serializers;
+					// this won't be applied to this serializer until all compatibility checks have been completed
+					HashMap<Class<?>, TypeSerializer<?>> rebuiltCache = new HashMap<>();
+
+					for (Map.Entry<Class<?>, TypeSerializerConfigSnapshot> previousCachedEntry
+							: config.getNonRegisteredSubclassesToSerializerConfigSnapshots().entrySet()) {
+
+						TypeSerializer<?> cachedSerializer = createSubclassSerializer(previousCachedEntry.getKey());
+
+						// check compatibility of cached subclass serializer
+						compatResult = cachedSerializer.ensureCompatibility(previousCachedEntry.getValue());
+						if (compatResult.requiresMigration()) {
+							return CompatibilityResult.requiresMigration(null);
+						} else {
+							rebuiltCache.put(previousCachedEntry.getKey(), cachedSerializer);
+						}
+					}
+
+					// completed compatibility checks; up to this point, we can just reconfigure
+					// the serializer so that it is compatible and migration is not required
+
+					this.fields = reorderedFields;
+					this.fieldSerializers = reorderedFieldSerializers;
+
+					this.registeredClasses = reorderedRegisteredSubclassesToClasstags;
+					this.registeredSerializers = reorderedRegisteredSubclassSerializers;
+
+					this.subclassSerializerCache = rebuiltCache;
+
+					return CompatibilityResult.compatible();
+				}
+			}
+		}
+
+		return CompatibilityResult.requiresMigration(null);
+	}
+
+	public static final class PojoSerializerConfigSnapshot<T> extends GenericTypeSerializerConfigSnapshot<T> {
+
+		private static final int VERSION = 1;
+
+		/**
+		 * Ordered map of POJO fields to the configuration snapshots of their corresponding serializers.
+		 *
+		 * <p>Ordering of the fields is kept so that new Pojo serializers for previous data
+		 * may reorder the fields in case they are different. The order of the fields need to
+		 * stay the same for binary compatibility, as the field order is part of the serialization format.
+		 */
+		private LinkedHashMap<Field, TypeSerializerConfigSnapshot> fieldToSerializerConfigSnapshot;
+
+		/**
+		 * Ordered map of registered subclasses to the configuration snapshots of their corresponding serializers.
+		 *
+		 * <p>Ordering of the registered subclasses is kept so that new Pojo serializers for previous data
+		 * may retain the same class tag used for registered subclasses. Newly registered subclasses that
+		 * weren't present before should be appended with the next available class tag.
+		 */
+		private LinkedHashMap<Class<?>, TypeSerializerConfigSnapshot> registeredSubclassesToSerializerConfigSnapshots;
+
+		/**
+		 * Configuration snapshots of previously cached non-registered subclass serializers.
+		 *
+		 * <p>This is kept so that new Pojo serializers may eagerly repopulate their
+		 * cache with reconfigured subclass serializers.
+		 */
+		private HashMap<Class<?>, TypeSerializerConfigSnapshot> nonRegisteredSubclassesToSerializerConfigSnapshots;
+
+		/** This empty nullary constructor is required for deserializing the configuration. */
+		public PojoSerializerConfigSnapshot() {}
+
+		public PojoSerializerConfigSnapshot(
+				Class<T> pojoType,
+				LinkedHashMap<Field, TypeSerializerConfigSnapshot> fieldToSerializerConfigSnapshot,
+				LinkedHashMap<Class<?>, TypeSerializerConfigSnapshot> registeredSubclassesToSerializerConfigSnapshots,
+				HashMap<Class<?>, TypeSerializerConfigSnapshot> nonRegisteredSubclassesToSerializerConfigSnapshots) {
+
+			super(pojoType);
+
+			this.fieldToSerializerConfigSnapshot =
+					Preconditions.checkNotNull(fieldToSerializerConfigSnapshot);
+			this.registeredSubclassesToSerializerConfigSnapshots =
+					Preconditions.checkNotNull(registeredSubclassesToSerializerConfigSnapshots);
+			this.nonRegisteredSubclassesToSerializerConfigSnapshots =
+					Preconditions.checkNotNull(nonRegisteredSubclassesToSerializerConfigSnapshots);
+		}
+
+		@Override
+		public void write(DataOutputView out) throws IOException {
+			super.write(out);
+
+			// --- write fields and their serializers, in order
+
+			out.writeInt(fieldToSerializerConfigSnapshot.size());
+			for (Map.Entry<Field, TypeSerializerConfigSnapshot> entry
+				: fieldToSerializerConfigSnapshot.entrySet()) {
+				out.writeUTF(entry.getKey().getName());
+				TypeSerializerUtil.writeSerializerConfigSnapshot(out, entry.getValue());
+			}
+
+			// --- write registered subclasses and their serializers, in registration order
+
+			out.writeInt(registeredSubclassesToSerializerConfigSnapshots.size());
+			for (Map.Entry<Class<?>, TypeSerializerConfigSnapshot> entry
+					: registeredSubclassesToSerializerConfigSnapshots.entrySet()) {
+				out.writeUTF(entry.getKey().getName());
+				TypeSerializerUtil.writeSerializerConfigSnapshot(out, entry.getValue());
+			}
+
+			// --- write snapshot of non-registered subclass serializer cache
+
+			out.writeInt(nonRegisteredSubclassesToSerializerConfigSnapshots.size());
+			for (Map.Entry<Class<?>, TypeSerializerConfigSnapshot> entry
+					: nonRegisteredSubclassesToSerializerConfigSnapshots.entrySet()) {
+				out.writeUTF(entry.getKey().getName());
+				TypeSerializerUtil.writeSerializerConfigSnapshot(out, entry.getValue());
+			}
+		}
+
+		@Override
+		public void read(DataInputView in) throws IOException {
+			super.read(in);
+
+			// --- read fields and their serializers, in order
+
+			int numFields = in.readInt();
+			this.fieldToSerializerConfigSnapshot = new LinkedHashMap<>(numFields);
+			String fieldName;
+			Field field;
+			for (int i = 0; i < numFields; i++) {
+				fieldName = in.readUTF();
+
+				// search all superclasses for the field
+				Class<?> clazz = getTypeClass();
+				field = null;
+				while (clazz != null) {
+					try {
+						field = clazz.getDeclaredField(fieldName);
+						field.setAccessible(true);
+						break;
+					} catch (NoSuchFieldException e) {
+						clazz = clazz.getSuperclass();
+					}
+				}
+
+				if (field == null) {
+					// the field no longer exists in the POJO
+					throw new IOException("Can't find field " + fieldName + " in POJO class " + getTypeClass().getName());
+				} else {
+					fieldToSerializerConfigSnapshot.put(
+							field,
+							TypeSerializerUtil.readSerializerConfigSnapshot(in, getUserCodeClassLoader()));
+				}
+			}
+
+			// --- read registered subclasses and their serializers, in registration order
+
+			int numRegisteredSubclasses = in.readInt();
+			this.registeredSubclassesToSerializerConfigSnapshots = new LinkedHashMap<>(numRegisteredSubclasses);
+			String registeredSubclassname;
+			Class<?> registeredSubclass;
+			for (int i = 0; i < numRegisteredSubclasses; i++) {
+				registeredSubclassname = in.readUTF();
+				try {
+					registeredSubclass = Class.forName(registeredSubclassname, true, getUserCodeClassLoader());
+				} catch (ClassNotFoundException e) {
+					throw new IOException("Cannot find requested class " + registeredSubclassname + " in classpath.", e);
+				}
+
+				this.registeredSubclassesToSerializerConfigSnapshots.put(
+						registeredSubclass,
+						TypeSerializerUtil.readSerializerConfigSnapshot(in, getUserCodeClassLoader()));
+			}
+
+			// --- read snapshot of non-registered subclass serializer cache
+
+			int numCachedSubclassSerializers = in.readInt();
+			this.nonRegisteredSubclassesToSerializerConfigSnapshots = new HashMap<>(numCachedSubclassSerializers);
+			String cachedSubclassname;
+			Class<?> cachedSubclass;
+			for (int i = 0; i < numCachedSubclassSerializers; i++) {
+				cachedSubclassname = in.readUTF();
+				try {
+					cachedSubclass = Class.forName(cachedSubclassname, true, getUserCodeClassLoader());
+				} catch (ClassNotFoundException e) {
+					throw new IOException("Cannot find requested class " + cachedSubclassname + " in classpath.", e);
+				}
+
+				this.nonRegisteredSubclassesToSerializerConfigSnapshots.put(
+						cachedSubclass,
+						TypeSerializerUtil.readSerializerConfigSnapshot(in, getUserCodeClassLoader()));
+			}
+		}
+
+		@Override
+		public int getVersion() {
+			return VERSION;
+		}
+
+		public LinkedHashMap<Field, TypeSerializerConfigSnapshot> getFieldToSerializerConfigSnapshot() {
+			return fieldToSerializerConfigSnapshot;
+		}
+
+		public LinkedHashMap<Class<?>, TypeSerializerConfigSnapshot> getRegisteredSubclassesToSerializerConfigSnapshots() {
+			return registeredSubclassesToSerializerConfigSnapshots;
+		}
+
+		public HashMap<Class<?>, TypeSerializerConfigSnapshot> getNonRegisteredSubclassesToSerializerConfigSnapshots() {
+			return nonRegisteredSubclassesToSerializerConfigSnapshots;
+		}
+
+		@Override
+		public boolean equals(Object obj) {
+			return super.equals(obj)
+					&& (obj instanceof PojoSerializerConfigSnapshot)
+					&& fieldToSerializerConfigSnapshot.equals(((PojoSerializerConfigSnapshot) obj).getFieldToSerializerConfigSnapshot())
+					&& registeredSubclassesToSerializerConfigSnapshots.equals(((PojoSerializerConfigSnapshot) obj).getRegisteredSubclassesToSerializerConfigSnapshots())
+					&& nonRegisteredSubclassesToSerializerConfigSnapshots.equals(((PojoSerializerConfigSnapshot) obj).nonRegisteredSubclassesToSerializerConfigSnapshots);
+		}
+
+		@Override
+		public int hashCode() {
+			return super.hashCode()
+				+ Objects.hash(
+					fieldToSerializerConfigSnapshot,
+					registeredSubclassesToSerializerConfigSnapshots,
+					nonRegisteredSubclassesToSerializerConfigSnapshots);
+		}
+	}
+
+	// --------------------------------------------------------------------------------------------
+
+	private void writeObject(ObjectOutputStream out)
+		throws IOException, ClassNotFoundException {
+		out.defaultWriteObject();
+		out.writeInt(fields.length);
+		for (Field field: fields) {
+			FieldSerializer.serializeField(field, out);
+		}
+	}
+
+	private void readObject(ObjectInputStream in)
+		throws IOException, ClassNotFoundException {
+		in.defaultReadObject();
+		int numFields = in.readInt();
+		fields = new Field[numFields];
+		for (int i = 0; i < numFields; i++) {
+			fields[i] = FieldSerializer.deserializeField(in);
+		}
+
+		cl = Thread.currentThread().getContextClassLoader();
+		subclassSerializerCache = new HashMap<Class<?>, TypeSerializer<?>>();
+	}
+
+	// --------------------------------------------------------------------------------------------
+	// Utilities
+	// --------------------------------------------------------------------------------------------
+
+	/**
+	 * Extracts the subclasses of the base POJO class registered in the execution config.
+	 */
+	private static LinkedHashSet<Class<?>> getRegisteredSubclassesFromExecutionConfig(
+			Class<?> basePojoClass,
+			ExecutionConfig executionConfig) {
+
+		LinkedHashSet<Class<?>> subclassesInRegistrationOrder = new LinkedHashSet<>(executionConfig.getRegisteredPojoTypes().size());
+		for (Class<?> registeredClass : executionConfig.getRegisteredPojoTypes()) {
+			if (registeredClass.equals(basePojoClass)) {
+				continue;
+			}
+			if (!basePojoClass.isAssignableFrom(registeredClass)) {
+				continue;
+			}
+			subclassesInRegistrationOrder.add(registeredClass);
+		}
+
+		return subclassesInRegistrationOrder;
+	}
+
+	/**
+	 * Builds map of registered subclasses to their class tags.
+	 * Class tags will be integers starting from 0, assigned incrementally with the order of provided subclasses.
+	 */
+	private static LinkedHashMap<Class<?>, Integer> createRegisteredSubclassTags(LinkedHashSet<Class<?>> registeredSubclasses) {
+		final LinkedHashMap<Class<?>, Integer> classToTag = new LinkedHashMap<>();
+
+		int id = 0;
+		for (Class<?> registeredClass : registeredSubclasses) {
+			classToTag.put(registeredClass, id);
+			id ++;
+		}
+
+		return classToTag;
+	}
+
+	/**
+	 * Creates an array of serializers for provided list of registered subclasses.
+	 * Order of returned serializers will correspond to order of provided subclasses.
+	 */
+	private static TypeSerializer<?>[] createRegisteredSubclassSerializers(
+			LinkedHashSet<Class<?>> registeredSubclasses,
+			ExecutionConfig executionConfig) {
+
+		final TypeSerializer<?>[] subclassSerializers = new TypeSerializer[registeredSubclasses.size()];
+
+		int i = 0;
+		for (Class<?> registeredClass : registeredSubclasses) {
+			subclassSerializers[i] = TypeExtractor.createTypeInfo(registeredClass).createSerializer(executionConfig);
+			i++;
+		}
+
+		return subclassSerializers;
+	}
+
+	/**
+	 * Fetches cached serializer for a non-registered subclass;
+	 * also creates the serializer if it doesn't exist yet.
+	 *
+	 * This method is also exposed to package-private access
+	 * for testing purposes.
+	 */
+	TypeSerializer<?> getSubclassSerializer(Class<?> subclass) {
+		TypeSerializer<?> result = subclassSerializerCache.get(subclass);
+		if (result == null) {
+			result = createSubclassSerializer(subclass);
+			subclassSerializerCache.put(subclass, result);
+		}
+		return result;
+	}
+
+	private TypeSerializer<?> createSubclassSerializer(Class<?> subclass) {
+		TypeSerializer<?> serializer = TypeExtractor.createTypeInfo(subclass).createSerializer(executionConfig);
+
+		if (serializer instanceof PojoSerializer) {
+			PojoSerializer<?> subclassSerializer = (PojoSerializer<?>) serializer;
+			subclassSerializer.copyBaseFieldOrder(this);
+		}
+
+		return serializer;
+	}
+
+	/**
+	 * Finds and returns the order (0-based) of a POJO field.
+	 * Returns -1 if the field does not exist for this POJO.
+	 */
+	private int findField(Field f) {
+		int foundIndex = 0;
+		for (Field field : fields) {
+			if (f.equals(field)) {
+				return foundIndex;
+			}
+
+			foundIndex++;
+		}
+
+		return -1;
+	}
+
+	private void copyBaseFieldOrder(PojoSerializer<?> baseSerializer) {
+		// do nothing for now, but in the future, adapt subclass serializer to have same
+		// ordering as base class serializer so that binary comparison on base class fields
+		// can work
+	}
+
+	/**
+	 * Build and return a snapshot of the serializer's parameters and currently cached serializers.
+	 */
+	private static <T> PojoSerializerConfigSnapshot<T> buildConfigSnapshot(
+			Class<T> pojoType,
+			LinkedHashMap<Class<?>, Integer> registeredSubclassesToTags,
+			TypeSerializer<?>[] registeredSubclassSerializers,
+			Field[] fields,
+			TypeSerializer<?>[] fieldSerializers,
+			HashMap<Class<?>, TypeSerializer<?>> nonRegisteredSubclassSerializerCache) {
+
+		final LinkedHashMap<Field, TypeSerializerConfigSnapshot> fieldToSerializerConfigSnapshots =
+			new LinkedHashMap<>(fields.length);
+
+		for (int i = 0; i < fields.length; i++) {
+			fieldToSerializerConfigSnapshots.put(fields[i], fieldSerializers[i].snapshotConfiguration());
+		}
+
+		final LinkedHashMap<Class<?>, TypeSerializerConfigSnapshot> registeredSubclassesToSerializerConfigSnapshots =
+				new LinkedHashMap<>(registeredSubclassesToTags.size());
+
+		for (Map.Entry<Class<?>, Integer> entry : registeredSubclassesToTags.entrySet()) {
+			registeredSubclassesToSerializerConfigSnapshots.put(
+					entry.getKey(),
+					registeredSubclassSerializers[entry.getValue()].snapshotConfiguration());
+		}
+
+		final HashMap<Class<?>, TypeSerializerConfigSnapshot> nonRegisteredSubclassesToSerializerConfigSnapshots =
+				new LinkedHashMap<>(nonRegisteredSubclassSerializerCache.size());
+
+		for (Map.Entry<Class<?>, TypeSerializer<?>> entry : nonRegisteredSubclassSerializerCache.entrySet()) {
+			nonRegisteredSubclassesToSerializerConfigSnapshots.put(entry.getKey(), entry.getValue().snapshotConfiguration());
+		}
+
+		return new PojoSerializerConfigSnapshot<>(
+				pojoType,
+				fieldToSerializerConfigSnapshots,
+				registeredSubclassesToSerializerConfigSnapshots,
+				nonRegisteredSubclassesToSerializerConfigSnapshots);
+	}
+
+	// --------------------------------------------------------------------------------------------
+	// Test utilities
+	// --------------------------------------------------------------------------------------------
+
+	@VisibleForTesting
+	Field[] getFields() {
+		return fields;
+	}
+
+	@VisibleForTesting
+	TypeSerializer<?>[] getFieldSerializers() {
+		return fieldSerializers;
+	}
+
+	@VisibleForTesting
+	LinkedHashMap<Class<?>, Integer> getRegisteredClasses() {
+		return registeredClasses;
+	}
+
+	@VisibleForTesting
+	TypeSerializer<?>[] getRegisteredSerializers() {
+		return registeredSerializers;
+	}
+
+	@VisibleForTesting
+	HashMap<Class<?>, TypeSerializer<?>> getSubclassSerializerCache() {
+		return subclassSerializerCache;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowSerializer.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowSerializer.java
index dbd5d3a..5770dac 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowSerializer.java
@@ -18,12 +18,17 @@
 package org.apache.flink.api.java.typeutils.runtime;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.CompatibilityResult;
+import org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializerUtil;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.types.Row;
 
 import java.io.IOException;
+import java.io.ObjectInputStream;
 import java.util.Arrays;
 
 import static org.apache.flink.api.java.typeutils.runtime.NullMaskUtils.readIntoAndCopyNullMask;
@@ -35,12 +40,15 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  * Serializer for {@link Row}.
  */
 @Internal
-public class RowSerializer extends TypeSerializer<Row> {
+public final class RowSerializer extends TypeSerializer<Row> {
 
 	private static final long serialVersionUID = 1L;
-	private final boolean[] nullMask;
+
 	private final TypeSerializer<Object>[] fieldSerializers;
 
+	private transient boolean[] nullMask;
+
+	@SuppressWarnings("unchecked")
 	public RowSerializer(TypeSerializer<?>[] fieldSerializers) {
 		this.fieldSerializers = (TypeSerializer<Object>[]) checkNotNull(fieldSerializers);
 		this.nullMask = new boolean[fieldSerializers.length];
@@ -231,4 +239,73 @@ public class RowSerializer extends TypeSerializer<Row> {
 	public int hashCode() {
 		return Arrays.hashCode(fieldSerializers);
 	}
+
+	// --------------------------------------------------------------------------------------------
+
+	private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
+		in.defaultReadObject();
+		this.nullMask = new boolean[fieldSerializers.length];
+	}
+
+	// --------------------------------------------------------------------------------------------
+	// Serializer configuration snapshotting & compatibility
+	// --------------------------------------------------------------------------------------------
+
+	@Override
+	public RowSerializerConfigSnapshot snapshotConfiguration() {
+		return new RowSerializerConfigSnapshot(TypeSerializerUtil.snapshotConfigurations(fieldSerializers));
+	}
+
+	@Override
+	public CompatibilityResult<Row> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
+		if (configSnapshot instanceof RowSerializerConfigSnapshot) {
+			TypeSerializerConfigSnapshot[] fieldSerializerConfigSnapshots =
+				((RowSerializerConfigSnapshot) configSnapshot).getNestedSerializerConfigSnapshots();
+
+			if (fieldSerializerConfigSnapshots.length == fieldSerializers.length) {
+				boolean requireMigration = false;
+				TypeSerializer<?>[] convertDeserializers = new TypeSerializer<?>[fieldSerializers.length];
+
+				CompatibilityResult<?> compatResult;
+				for (int i = 0; i < fieldSerializers.length; i++) {
+					compatResult = fieldSerializers[i].ensureCompatibility(fieldSerializerConfigSnapshots[i]);
+					if (compatResult.requiresMigration()) {
+						requireMigration = true;
+
+						if (compatResult.getConvertDeserializer() == null) {
+							// one of the field serializers cannot provide a fallback deserializer
+							return CompatibilityResult.requiresMigration(null);
+						} else {
+							convertDeserializers[i] = compatResult.getConvertDeserializer();
+						}
+					}
+				}
+
+				if (requireMigration) {
+					return CompatibilityResult.requiresMigration(new RowSerializer(convertDeserializers));
+				} else {
+					return CompatibilityResult.compatible();
+				}
+			}
+		}
+
+		return CompatibilityResult.requiresMigration(null);
+	}
+
+	public static final class RowSerializerConfigSnapshot extends CompositeTypeSerializerConfigSnapshot {
+
+		private static final int VERSION = 1;
+
+		/** This empty nullary constructor is required for deserializing the configuration. */
+		public RowSerializerConfigSnapshot() {}
+
+		public RowSerializerConfigSnapshot(TypeSerializerConfigSnapshot[] fieldSerializerConfigSnapshots) {
+			super(fieldSerializerConfigSnapshots);
+		}
+
+		@Override
+		public int getVersion() {
+			return VERSION;
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java
index afc4aa2..68d5aa8 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java
@@ -19,7 +19,10 @@
 package org.apache.flink.api.java.typeutils.runtime;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.CompatibilityResult;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializerUtil;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 
@@ -115,4 +118,43 @@ public abstract class TupleSerializerBase<T> extends TypeSerializer<T> {
 	public boolean canEqual(Object obj) {
 		return obj instanceof TupleSerializerBase;
 	}
+
+	// --------------------------------------------------------------------------------------------
+	// Serializer configuration snapshotting & compatibility
+	// --------------------------------------------------------------------------------------------
+
+	@Override
+	public TupleSerializerConfigSnapshot<T> snapshotConfiguration() {
+		return new TupleSerializerConfigSnapshot<>(
+				tupleClass,
+				TypeSerializerUtil.snapshotConfigurations(fieldSerializers));
+	}
+
+	@SuppressWarnings("unchecked")
+	@Override
+	public CompatibilityResult<T> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
+		if (configSnapshot instanceof TupleSerializerConfigSnapshot) {
+			final TupleSerializerConfigSnapshot<T> config = (TupleSerializerConfigSnapshot<T>) configSnapshot;
+
+			if (tupleClass.equals(config.getTupleClass())) {
+				TypeSerializerConfigSnapshot[] fieldSerializerConfigSnapshots =
+					((TupleSerializerConfigSnapshot) configSnapshot).getNestedSerializerConfigSnapshots();
+
+				if (fieldSerializerConfigSnapshots.length == fieldSerializers.length) {
+
+					CompatibilityResult compatResult;
+					for (int i = 0; i < fieldSerializers.length; i++) {
+						compatResult = fieldSerializers[i].ensureCompatibility(fieldSerializerConfigSnapshots[i]);
+						if (compatResult.requiresMigration()) {
+							return CompatibilityResult.requiresMigration(null);
+						}
+					}
+
+					return CompatibilityResult.compatible();
+				}
+			}
+		}
+
+		return CompatibilityResult.requiresMigration(null);
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerConfigSnapshot.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerConfigSnapshot.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerConfigSnapshot.java
new file mode 100644
index 0000000..6d2bb5f
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerConfigSnapshot.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+
+/**
+ * Snapshot of a tuple serializer's configuration.
+ */
+@Internal
+public final class TupleSerializerConfigSnapshot<T> extends CompositeTypeSerializerConfigSnapshot {
+
+	private static final int VERSION = 1;
+
+	private Class<T> tupleClass;
+
+	/** This empty nullary constructor is required for deserializing the configuration. */
+	public TupleSerializerConfigSnapshot() {}
+
+	public TupleSerializerConfigSnapshot(
+			Class<T> tupleClass,
+			TypeSerializerConfigSnapshot[] fieldSerializerConfigSnapshots) {
+
+		super(fieldSerializerConfigSnapshots);
+
+		this.tupleClass = Preconditions.checkNotNull(tupleClass);
+	}
+
+	@Override
+	public void write(DataOutputView out) throws IOException {
+		super.write(out);
+
+		InstantiationUtil.serializeObject(new DataOutputViewStream(out), tupleClass);
+	}
+
+	@Override
+	public void read(DataInputView in) throws IOException {
+		super.read(in);
+
+		try {
+			tupleClass = InstantiationUtil.deserializeObject(new DataInputViewStream(in), getUserCodeClassLoader());
+		} catch (ClassNotFoundException e) {
+			throw new IOException("Could not find requested tuple class in classpath.", e);
+		}
+	}
+
+	@Override
+	public int getVersion() {
+		return VERSION;
+	}
+
+	public Class<T> getTupleClass() {
+		return tupleClass;
+	}
+
+	@Override
+	public boolean equals(Object obj) {
+		return super.equals(obj)
+			&& (obj instanceof TupleSerializerConfigSnapshot)
+			&& (tupleClass.equals(((TupleSerializerConfigSnapshot) obj).getTupleClass()));
+	}
+
+	@Override
+	public int hashCode() {
+		return super.hashCode() * 31 + tupleClass.hashCode();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/ValueSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/ValueSerializer.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/ValueSerializer.java
index 56e204c..10e2330 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/ValueSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/ValueSerializer.java
@@ -19,15 +19,20 @@
 package org.apache.flink.api.java.typeutils.runtime;
 
 import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.util.LinkedHashMap;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.CompatibilityResult;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.types.Value;
 import org.apache.flink.util.InstantiationUtil;
 
 import com.esotericsoftware.kryo.Kryo;
+import org.apache.flink.util.Preconditions;
 import org.objenesis.strategy.StdInstantiatorStrategy;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -39,12 +44,23 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  * @param <T> The type serialized.
  */
 @Internal
-public class ValueSerializer<T extends Value> extends TypeSerializer<T> {
+public final class ValueSerializer<T extends Value> extends TypeSerializer<T> {
 
 	private static final long serialVersionUID = 1L;
 	
 	private final Class<T> type;
-	
+
+	/**
+	 * Map of class tag (using classname as tag) to their Kryo registration.
+	 *
+	 * <p>This map serves as a preview of the final registration result of
+	 * the Kryo instance, taking into account registration overwrites.
+	 *
+	 * <p>Currently, we only have one single registration for the value type.
+	 * Nevertheless, we keep this information here for future compatibility.
+	 */
+	private LinkedHashMap<String, KryoRegistration> kryoRegistrations;
+
 	private transient Kryo kryo;
 	
 	private transient T copyInstance;
@@ -53,6 +69,7 @@ public class ValueSerializer<T extends Value> extends TypeSerializer<T> {
 	
 	public ValueSerializer(Class<T> type) {
 		this.type = checkNotNull(type);
+		this.kryoRegistrations = asKryoRegistrations(type);
 	}
 
 	// --------------------------------------------------------------------------------------------
@@ -126,7 +143,8 @@ public class ValueSerializer<T extends Value> extends TypeSerializer<T> {
 			kryo.setInstantiatorStrategy(instantiatorStrategy);
 
 			this.kryo.setAsmEnabled(true);
-			this.kryo.register(type);
+
+			KryoUtils.applyRegistrations(this.kryo, kryoRegistrations.values());
 		}
 	}
 	
@@ -152,4 +170,66 @@ public class ValueSerializer<T extends Value> extends TypeSerializer<T> {
 	public boolean canEqual(Object obj) {
 		return obj instanceof ValueSerializer;
 	}
+
+	// --------------------------------------------------------------------------------------------
+	// Serializer configuration snapshotting & compatibility
+	// --------------------------------------------------------------------------------------------
+
+	@Override
+	public ValueSerializerConfigSnapshot<T> snapshotConfiguration() {
+		return new ValueSerializerConfigSnapshot<>(type);
+	}
+
+	@SuppressWarnings("unchecked")
+	@Override
+	public CompatibilityResult<T> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
+		if (configSnapshot instanceof ValueSerializerConfigSnapshot) {
+			final ValueSerializerConfigSnapshot<T> config = (ValueSerializerConfigSnapshot<T>) configSnapshot;
+
+			if (type.equals(config.getTypeClass())) {
+				// currently, simply checking the type of the value class is sufficient;
+				// in the future, if there are more Kryo registrations, we should try to resolve that
+				return CompatibilityResult.compatible();
+			}
+		}
+
+		return CompatibilityResult.requiresMigration(null);
+	}
+
+	public static class ValueSerializerConfigSnapshot<T extends Value> extends KryoRegistrationSerializerConfigSnapshot<T> {
+
+		private static final int VERSION = 1;
+
+		/** This empty nullary constructor is required for deserializing the configuration. */
+		public ValueSerializerConfigSnapshot() {}
+
+		public ValueSerializerConfigSnapshot(Class<T> valueTypeClass) {
+			super(valueTypeClass, asKryoRegistrations(valueTypeClass));
+		}
+
+		@Override
+		public int getVersion() {
+			return VERSION;
+		}
+	}
+
+	// --------------------------------------------------------------------------------------------
+
+	private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
+		in.defaultReadObject();
+
+		// kryoRegistrations may be null if this value serializer is deserialized from an old version
+		if (kryoRegistrations == null) {
+			this.kryoRegistrations = asKryoRegistrations(type);
+		}
+	}
+
+	private static LinkedHashMap<String, KryoRegistration> asKryoRegistrations(Class<?> type) {
+		Preconditions.checkNotNull(type);
+
+		LinkedHashMap<String, KryoRegistration> registration = new LinkedHashMap<>(1);
+		registration.put(type.getClass().getName(), new KryoRegistration(type));
+
+		return registration;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java
index cba0c84..a172b72 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java
@@ -21,18 +21,22 @@ package org.apache.flink.api.java.typeutils.runtime.kryo;
 import com.esotericsoftware.kryo.Kryo;
 import com.esotericsoftware.kryo.KryoException;
 import com.esotericsoftware.kryo.Serializer;
-import com.esotericsoftware.kryo.factories.ReflectionSerializerFactory;
 import com.esotericsoftware.kryo.io.Input;
 import com.esotericsoftware.kryo.io.Output;
 
 import org.apache.avro.generic.GenericData;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeutils.CompatibilityResult;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
 import org.apache.flink.api.java.typeutils.runtime.DataInputViewStream;
 import org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream;
+import org.apache.flink.api.java.typeutils.runtime.KryoRegistration;
+import org.apache.flink.api.java.typeutils.runtime.KryoRegistrationSerializerConfigSnapshot;
+import org.apache.flink.api.java.typeutils.runtime.KryoUtils;
 import org.apache.flink.api.java.typeutils.runtime.NoFetchingInput;
-import org.apache.flink.api.java.typeutils.runtime.kryo.Serializers.SpecificInstanceCollectionSerializerForArrayList;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 
@@ -45,6 +49,7 @@ import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.EOFException;
 import java.io.IOException;
+import java.io.ObjectInputStream;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 import java.lang.reflect.Modifier;
@@ -72,11 +77,16 @@ public class KryoSerializer<T> extends TypeSerializer<T> {
 
 	// ------------------------------------------------------------------------
 
-	private final LinkedHashMap<Class<?>, ExecutionConfig.SerializableSerializer<?>> registeredTypesWithSerializers;
-	private final LinkedHashMap<Class<?>, Class<? extends Serializer<?>>> registeredTypesWithSerializerClasses;
 	private final LinkedHashMap<Class<?>, ExecutionConfig.SerializableSerializer<?>> defaultSerializers;
 	private final LinkedHashMap<Class<?>, Class<? extends Serializer<?>>> defaultSerializerClasses;
-	private final LinkedHashSet<Class<?>> registeredTypes;
+
+	/**
+	 * Map of class tag (using classname as tag) to their Kryo registration.
+	 *
+	 * <p>This map serves as a preview of the final registration result of
+	 * the Kryo instance, taking into account registration overwrites.
+	 */
+	private LinkedHashMap<String, KryoRegistration> kryoRegistrations;
 
 	private final Class<T> type;
 	
@@ -93,26 +103,35 @@ public class KryoSerializer<T> extends TypeSerializer<T> {
 	private transient Output output;
 
 	// ------------------------------------------------------------------------
+	// legacy fields; these fields cannot yet be removed to retain backwards compatibility
+
+	private LinkedHashMap<Class<?>, ExecutionConfig.SerializableSerializer<?>> registeredTypesWithSerializers;
+	private LinkedHashMap<Class<?>, Class<? extends Serializer<?>>> registeredTypesWithSerializerClasses;
+	private LinkedHashSet<Class<?>> registeredTypes;
+
+	// ------------------------------------------------------------------------
 
 	public KryoSerializer(Class<T> type, ExecutionConfig executionConfig){
 		this.type = checkNotNull(type);
 
 		this.defaultSerializers = executionConfig.getDefaultKryoSerializers();
 		this.defaultSerializerClasses = executionConfig.getDefaultKryoSerializerClasses();
-		this.registeredTypesWithSerializers = executionConfig.getRegisteredTypesWithKryoSerializers();
-		this.registeredTypesWithSerializerClasses = executionConfig.getRegisteredTypesWithKryoSerializerClasses();
-		this.registeredTypes = executionConfig.getRegisteredKryoTypes();
+
+		this.kryoRegistrations = buildKryoRegistrations(
+				this.type,
+				executionConfig.getRegisteredKryoTypes(),
+				executionConfig.getRegisteredTypesWithKryoSerializerClasses(),
+				executionConfig.getRegisteredTypesWithKryoSerializers());
 	}
 
 	/**
 	 * Copy-constructor that does not copy transient fields. They will be initialized once required.
 	 */
 	protected KryoSerializer(KryoSerializer<T> toCopy) {
-		registeredTypesWithSerializers = toCopy.registeredTypesWithSerializers;
-		registeredTypesWithSerializerClasses = toCopy.registeredTypesWithSerializerClasses;
 		defaultSerializers = toCopy.defaultSerializers;
 		defaultSerializerClasses = toCopy.defaultSerializerClasses;
-		registeredTypes = toCopy.registeredTypes;
+
+		kryoRegistrations = toCopy.kryoRegistrations;
 
 		type = toCopy.type;
 		if(type == null){
@@ -272,11 +291,9 @@ public class KryoSerializer<T> extends TypeSerializer<T> {
 		if (obj instanceof KryoSerializer) {
 			KryoSerializer<?> other = (KryoSerializer<?>) obj;
 
-			// we cannot include the Serializers here because they don't implement the equals method
 			return other.canEqual(this) &&
 				type == other.type &&
-				registeredTypes.equals(other.registeredTypes) &&
-				registeredTypesWithSerializerClasses.equals(other.registeredTypesWithSerializerClasses) &&
+				kryoRegistrations.equals(other.kryoRegistrations) &&
 				defaultSerializerClasses.equals(other.defaultSerializerClasses);
 		} else {
 			return false;
@@ -334,7 +351,7 @@ public class KryoSerializer<T> extends TypeSerializer<T> {
 			//       This is due to a know issue with Kryo's JavaSerializer. See FLINK-6025 for details.
 			kryo.addDefaultSerializer(Throwable.class, new JavaSerializer());
 
-			// Add default serializers first, so that they type registrations without a serializer
+			// Add default serializers first, so that the type registrations without a serializer
 			// are registered with a default serializer
 			for (Map.Entry<Class<?>, ExecutionConfig.SerializableSerializer<?>> entry: defaultSerializers.entrySet()) {
 				kryo.addDefaultSerializer(entry.getKey(), entry.getValue().getSerializer());
@@ -344,59 +361,152 @@ public class KryoSerializer<T> extends TypeSerializer<T> {
 				kryo.addDefaultSerializer(entry.getKey(), entry.getValue());
 			}
 
-			// register the type of our class
-			kryo.register(type);
+			KryoUtils.applyRegistrations(this.kryo, kryoRegistrations.values());
 
-			// register given types. we do this first so that any registration of a
-			// more specific serializer overrides this
-			for (Class<?> type : registeredTypes) {
-				kryo.register(type);
-			}
+			kryo.setRegistrationRequired(false);
+			kryo.setClassLoader(Thread.currentThread().getContextClassLoader());
+		}
+	}
 
-			// register given serializer classes
-			for (Map.Entry<Class<?>, Class<? extends Serializer<?>>> e : registeredTypesWithSerializerClasses.entrySet()) {
-				Class<?> typeClass = e.getKey();
-				Class<? extends Serializer<?>> serializerClass = e.getValue();
+	// --------------------------------------------------------------------------------------------
+	// Serializer configuration snapshotting & compatibility
+	// --------------------------------------------------------------------------------------------
 
-				Serializer<?> serializer =
-						ReflectionSerializerFactory.makeSerializer(kryo, serializerClass, typeClass);
-				kryo.register(typeClass, serializer);
-			}
+	@Override
+	public KryoSerializerConfigSnapshot<T> snapshotConfiguration() {
+		return new KryoSerializerConfigSnapshot<>(type, kryoRegistrations);
+	}
 
-			// register given serializers
-			for (Map.Entry<Class<?>, ExecutionConfig.SerializableSerializer<?>> e : registeredTypesWithSerializers.entrySet()) {
-				kryo.register(e.getKey(), e.getValue().getSerializer());
+	@SuppressWarnings("unchecked")
+	@Override
+	public CompatibilityResult<T> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
+		if (configSnapshot instanceof KryoSerializerConfigSnapshot) {
+			final KryoSerializerConfigSnapshot<T> config = (KryoSerializerConfigSnapshot<T>) configSnapshot;
+
+			if (type.equals(config.getTypeClass())) {
+				LinkedHashMap<String, KryoRegistration> reconfiguredRegistrations = config.getKryoRegistrations();
+
+				// reconfigure by assuring that classes which were previously registered are registered
+				// again in the exact same order; new class registrations will be appended.
+				// this also overwrites any dummy placeholders that the restored old configuration has
+				reconfiguredRegistrations.putAll(kryoRegistrations);
+
+				// check if there is still any dummy placeholders even after reconfiguration;
+				// if so, then this new Kryo serializer cannot read old data and is therefore incompatible
+				for (Map.Entry<String, KryoRegistration> reconfiguredRegistrationEntry : reconfiguredRegistrations.entrySet()) {
+					if (reconfiguredRegistrationEntry.getValue().isDummy()) {
+						LOG.warn("The Kryo registration for a previously registered class {} does not have a " +
+							"proper serializer, because its previous serializer cannot be loaded or is no " +
+							"longer valid but a new serializer is not available", reconfiguredRegistrationEntry.getKey());
+
+						return CompatibilityResult.requiresMigration(null);
+					}
+				}
+
+				// there's actually no way to tell if new Kryo serializers are compatible with
+				// the previous ones they overwrite; we can only signal compatibly and hope for the best
+				this.kryoRegistrations = reconfiguredRegistrations;
+				return CompatibilityResult.compatible();
 			}
-			// this is needed for Avro but can not be added on demand.
-			kryo.register(GenericData.Array.class, new SpecificInstanceCollectionSerializerForArrayList());
+		}
 
-			kryo.setRegistrationRequired(false);
-			kryo.setClassLoader(Thread.currentThread().getContextClassLoader());
+		return CompatibilityResult.requiresMigration(null);
+	}
+
+	public static final class KryoSerializerConfigSnapshot<T> extends KryoRegistrationSerializerConfigSnapshot<T> {
+
+		private static final int VERSION = 1;
+
+		/** This empty nullary constructor is required for deserializing the configuration. */
+		public KryoSerializerConfigSnapshot() {}
+
+		public KryoSerializerConfigSnapshot(
+				Class<T> typeClass,
+				LinkedHashMap<String, KryoRegistration> kryoRegistrations) {
+
+			super(typeClass, kryoRegistrations);
+		}
+
+		@Override
+		public int getVersion() {
+			return VERSION;
 		}
 	}
 
 	// --------------------------------------------------------------------------------------------
-	// For testing
+	// Utilities
 	// --------------------------------------------------------------------------------------------
-	
-	public Kryo getKryo() {
-		checkKryoInitialized();
-		return this.kryo;
+
+	/**
+	 * Utility method that takes lists of registered types and their serializers, and resolve
+	 * them into a single list such that the result will resemble the final registration
+	 * result in Kryo.
+	 */
+	private static LinkedHashMap<String, KryoRegistration> buildKryoRegistrations(
+			Class<?> serializedType,
+			LinkedHashSet<Class<?>> registeredTypes,
+			LinkedHashMap<Class<?>, Class<? extends Serializer<?>>> registeredTypesWithSerializerClasses,
+			LinkedHashMap<Class<?>, ExecutionConfig.SerializableSerializer<?>> registeredTypesWithSerializers) {
+
+		final LinkedHashMap<String, KryoRegistration> kryoRegistrations = new LinkedHashMap<>();
+
+		kryoRegistrations.put(serializedType.getName(), new KryoRegistration(serializedType));
+
+		for (Class<?> registeredType : checkNotNull(registeredTypes)) {
+			kryoRegistrations.put(registeredType.getName(), new KryoRegistration(registeredType));
+		}
+
+		for (Map.Entry<Class<?>, Class<? extends Serializer<?>>> registeredTypeWithSerializerClassEntry :
+				checkNotNull(registeredTypesWithSerializerClasses).entrySet()) {
+
+			kryoRegistrations.put(
+					registeredTypeWithSerializerClassEntry.getKey().getName(),
+					new KryoRegistration(
+							registeredTypeWithSerializerClassEntry.getKey(),
+							registeredTypeWithSerializerClassEntry.getValue()));
+		}
+
+		for (Map.Entry<Class<?>, ExecutionConfig.SerializableSerializer<?>> registeredTypeWithSerializerEntry :
+				checkNotNull(registeredTypesWithSerializers).entrySet()) {
+
+			kryoRegistrations.put(
+					registeredTypeWithSerializerEntry.getKey().getName(),
+					new KryoRegistration(
+							registeredTypeWithSerializerEntry.getKey(),
+							registeredTypeWithSerializerEntry.getValue()));
+		}
+
+		kryoRegistrations.put(
+				GenericData.Array.class.getName(),
+				new KryoRegistration(
+						GenericData.Array.class,
+						new ExecutionConfig.SerializableSerializer<>(new Serializers.SpecificInstanceCollectionSerializerForArrayList())));
+
+		return kryoRegistrations;
 	}
 
-	@Override
-	public boolean canRestoreFrom(TypeSerializer<?> other) {
-		if (other instanceof KryoSerializer) {
-			KryoSerializer<?> otherKryo = (KryoSerializer<?>) other;
+	// --------------------------------------------------------------------------------------------
 
-			// we cannot include the Serializers here because they don't implement the equals method
-			return other.canEqual(this) &&
-					type == otherKryo.type &&
-					(registeredTypes.equals(otherKryo.registeredTypes) || otherKryo.registeredTypes.isEmpty()) &&
-					(registeredTypesWithSerializerClasses.equals(otherKryo.registeredTypesWithSerializerClasses) || otherKryo.registeredTypesWithSerializerClasses.isEmpty()) &&
-					(defaultSerializerClasses.equals(otherKryo.defaultSerializerClasses) || otherKryo.defaultSerializerClasses.isEmpty());
-		} else {
-			return false;
+	private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
+		in.defaultReadObject();
+
+		// kryoRegistrations may be null if this Kryo serializer is deserialized from an old version
+		if (kryoRegistrations == null) {
+			this.kryoRegistrations = buildKryoRegistrations(
+					type,
+					registeredTypes,
+					registeredTypesWithSerializerClasses,
+					registeredTypesWithSerializers);
 		}
 	}
+
+	// --------------------------------------------------------------------------------------------
+	// For testing
+	// --------------------------------------------------------------------------------------------
+
+	@VisibleForTesting
+	public Kryo getKryo() {
+		checkKryoInitialized();
+		return this.kryo;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java
index 91c6145..a846703 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java
@@ -20,6 +20,7 @@ package org.apache.flink.api.common.typeutils;
 
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
@@ -31,6 +32,9 @@ import java.io.DataOutputStream;
 import java.io.IOException;
 import java.util.Arrays;
 
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.TestLogger;
 import org.junit.Assert;
 
@@ -85,6 +89,38 @@ public abstract class SerializerTestBase<T> extends TestLogger {
 			fail("Exception in test: " + e.getMessage());
 		}
 	}
+
+	@Test
+	public void testConfigSnapshotInstantiation() {
+		TypeSerializerConfigSnapshot configSnapshot = getSerializer().snapshotConfiguration();
+
+		InstantiationUtil.instantiate(configSnapshot.getClass());
+	}
+
+	@Test
+	public void testSnapshotConfigurationAndReconfigure() throws Exception {
+		final TypeSerializerConfigSnapshot configSnapshot = getSerializer().snapshotConfiguration();
+
+		byte[] serializedConfig;
+		try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
+			TypeSerializerUtil.writeSerializerConfigSnapshot(
+				new DataOutputViewStreamWrapper(out), configSnapshot);
+			serializedConfig = out.toByteArray();
+		}
+
+		TypeSerializerConfigSnapshot restoredConfig;
+		try (ByteArrayInputStream in = new ByteArrayInputStream(serializedConfig)) {
+			restoredConfig = TypeSerializerUtil.readSerializerConfigSnapshot(
+				new DataInputViewStreamWrapper(in), Thread.currentThread().getContextClassLoader());
+		}
+
+		CompatibilityResult strategy = getSerializer().ensureCompatibility(restoredConfig);
+		assertFalse(strategy.requiresMigration());
+
+		// also verify that the serializer's reconfigure implementation detects incompatibility
+		strategy = getSerializer().ensureCompatibility(new TestIncompatibleSerializerConfigSnapshot());
+		assertTrue(strategy.requiresMigration());
+	}
 	
 	@Test
 	public void testGetLength() {
@@ -477,4 +513,21 @@ public abstract class SerializerTestBase<T> extends TestLogger {
 			}
 		}
 	}
+
+	public static final class TestIncompatibleSerializerConfigSnapshot extends TypeSerializerConfigSnapshot {
+		@Override
+		public int getVersion() {
+			return 0;
+		}
+
+		@Override
+		public boolean equals(Object obj) {
+			return obj instanceof TestIncompatibleSerializerConfigSnapshot;
+		}
+
+		@Override
+		public int hashCode() {
+			return getClass().hashCode();
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerConfigSnapshotTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerConfigSnapshotTest.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerConfigSnapshotTest.java
new file mode 100644
index 0000000..0783bb6
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerConfigSnapshotTest.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils;
+
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.junit.Test;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.net.URL;
+import java.net.URLClassLoader;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+/**
+ * Unit tests related to {@link TypeSerializerConfigSnapshot}.
+ */
+public class TypeSerializerConfigSnapshotTest {
+
+	/**
+	 * Verifies that reading and writing configuration snapshots work correctly.
+	 */
+	@Test
+	public void testSerializeConfigurationSnapshots() throws Exception {
+		TestConfigSnapshot configSnapshot1 = new TestConfigSnapshot(1, "foo");
+		TestConfigSnapshot configSnapshot2 = new TestConfigSnapshot(2, "bar");
+
+		byte[] serializedConfig;
+		try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
+			TypeSerializerUtil.writeSerializerConfigSnapshots(
+				new DataOutputViewStreamWrapper(out),
+				configSnapshot1,
+				configSnapshot2);
+
+			serializedConfig = out.toByteArray();
+		}
+
+		TypeSerializerConfigSnapshot[] restoredConfigs;
+		try (ByteArrayInputStream in = new ByteArrayInputStream(serializedConfig)) {
+			restoredConfigs = TypeSerializerUtil.readSerializerConfigSnapshots(
+				new DataInputViewStreamWrapper(in), Thread.currentThread().getContextClassLoader());
+		}
+
+		assertEquals(2, restoredConfigs.length);
+		assertEquals(configSnapshot1, restoredConfigs[0]);
+		assertEquals(configSnapshot2, restoredConfigs[1]);
+	}
+
+	/**
+	 * Verifies that deserializing config snapshots fail if the config class could not be found.
+	 */
+	@Test
+	public void testFailsWhenConfigurationSnapshotClassNotFound() throws Exception {
+		byte[] serializedConfig;
+		try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
+			TypeSerializerUtil.writeSerializerConfigSnapshot(
+				new DataOutputViewStreamWrapper(out), new TestConfigSnapshot(123, "foobar"));
+			serializedConfig = out.toByteArray();
+		}
+
+		try (ByteArrayInputStream in = new ByteArrayInputStream(serializedConfig)) {
+			// read using a dummy classloader
+			TypeSerializerUtil.readSerializerConfigSnapshot(
+				new DataInputViewStreamWrapper(in), new URLClassLoader(new URL[0], null));
+			fail("Expected a ClassNotFoundException wrapped in IOException");
+		} catch (IOException expected) {
+			// test passes
+		}
+	}
+
+	public static class TestConfigSnapshot extends TypeSerializerConfigSnapshot {
+
+		static final int VERSION = 1;
+
+		private int val;
+		private String msg;
+
+		public TestConfigSnapshot() {}
+
+		public TestConfigSnapshot(int val, String msg) {
+			this.val = val;
+			this.msg = msg;
+		}
+
+		@Override
+		public void write(DataOutputView out) throws IOException {
+			super.write(out);
+			out.writeInt(val);
+			out.writeUTF(msg);
+		}
+
+		@Override
+		public void read(DataInputView in) throws IOException {
+			super.read(in);
+			val = in.readInt();
+			msg = in.readUTF();
+		}
+
+		@Override
+		public int getVersion() {
+			return VERSION;
+		}
+
+		@Override
+		public boolean equals(Object obj) {
+			if (obj == this) {
+				return true;
+			}
+
+			if (obj == null) {
+				return false;
+			}
+
+			if (obj instanceof TestConfigSnapshot) {
+				return val == ((TestConfigSnapshot) obj).val && msg.equals(((TestConfigSnapshot) obj).msg);
+			} else {
+				return false;
+			}
+		}
+
+		@Override
+		public int hashCode() {
+			return 31 * val + msg.hashCode();
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/EnumSerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/EnumSerializerTest.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/EnumSerializerTest.java
index 5e2e733..5c615de 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/EnumSerializerTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/EnumSerializerTest.java
@@ -18,11 +18,25 @@
 
 package org.apache.flink.api.common.typeutils.base;
 
+import org.apache.flink.api.common.typeutils.CompatibilityResult;
 import org.apache.flink.api.common.typeutils.SerializerTestInstance;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializerUtil;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.Test;
 
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.util.Arrays;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
 public class EnumSerializerTest extends TestLogger {
 
 	@Test
@@ -41,6 +55,132 @@ public class EnumSerializerTest extends TestLogger {
 		new EnumSerializer<>(EmptyEnum.class);
 	}
 
+	@Test
+	public void testReconfiguration() {
+		// mock the previous ordering of enum constants to be BAR, PAULA, NATHANIEL
+		PublicEnum[] mockPreviousOrder = {PublicEnum.BAR, PublicEnum.PAULA, PublicEnum.NATHANIEL};
+
+		// now, the actual order of FOO, BAR, PETER, NATHANIEL, EMMA, PAULA will be the "new wrong order"
+		EnumSerializer<PublicEnum> serializer = new EnumSerializer<>(PublicEnum.class);
+
+		// verify that the serializer is first using the "wrong order" (i.e., the initial new configuration)
+		assertEquals(PublicEnum.FOO.ordinal(), serializer.getValueToOrdinal().get(PublicEnum.FOO).intValue());
+		assertEquals(PublicEnum.BAR.ordinal(), serializer.getValueToOrdinal().get(PublicEnum.BAR).intValue());
+		assertEquals(PublicEnum.PETER.ordinal(), serializer.getValueToOrdinal().get(PublicEnum.PETER).intValue());
+		assertEquals(PublicEnum.NATHANIEL.ordinal(), serializer.getValueToOrdinal().get(PublicEnum.NATHANIEL).intValue());
+		assertEquals(PublicEnum.EMMA.ordinal(), serializer.getValueToOrdinal().get(PublicEnum.EMMA).intValue());
+		assertEquals(PublicEnum.PAULA.ordinal(), serializer.getValueToOrdinal().get(PublicEnum.PAULA).intValue());
+
+		// reconfigure and verify compatibility
+		CompatibilityResult<PublicEnum> compatResult = serializer.ensureCompatibility(
+			new EnumSerializer.EnumSerializerConfigSnapshot<>(PublicEnum.class, mockPreviousOrder));
+		assertFalse(compatResult.requiresMigration());
+
+		// after reconfiguration, the order should be first the original BAR, PAULA, NATHANIEL,
+		// followed by the "new enum constants" FOO, PETER, EMMA
+		PublicEnum[] expectedOrder = {PublicEnum.BAR, PublicEnum.PAULA, PublicEnum.NATHANIEL, PublicEnum.FOO, PublicEnum.PETER, PublicEnum.EMMA};
+
+		int i = 0;
+		for (PublicEnum constant : expectedOrder) {
+			assertEquals(i, serializer.getValueToOrdinal().get(constant).intValue());
+			i++;
+		}
+
+		assertTrue(Arrays.equals(expectedOrder, serializer.getValues()));
+	}
+
+	@Test
+	public void testConfigurationSnapshotSerialization() throws Exception {
+		EnumSerializer<PublicEnum> serializer = new EnumSerializer<>(PublicEnum.class);
+
+		byte[] serializedConfig;
+		try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
+			TypeSerializerUtil.writeSerializerConfigSnapshot(
+				new DataOutputViewStreamWrapper(out), serializer.snapshotConfiguration());
+			serializedConfig = out.toByteArray();
+		}
+
+		TypeSerializerConfigSnapshot restoredConfig;
+		try (ByteArrayInputStream in = new ByteArrayInputStream(serializedConfig)) {
+			restoredConfig = TypeSerializerUtil.readSerializerConfigSnapshot(
+				new DataInputViewStreamWrapper(in), Thread.currentThread().getContextClassLoader());
+		}
+
+		CompatibilityResult<PublicEnum> compatResult = serializer.ensureCompatibility(restoredConfig);
+		assertFalse(compatResult.requiresMigration());
+
+		assertEquals(PublicEnum.FOO.ordinal(), serializer.getValueToOrdinal().get(PublicEnum.FOO).intValue());
+		assertEquals(PublicEnum.BAR.ordinal(), serializer.getValueToOrdinal().get(PublicEnum.BAR).intValue());
+		assertEquals(PublicEnum.PETER.ordinal(), serializer.getValueToOrdinal().get(PublicEnum.PETER).intValue());
+		assertEquals(PublicEnum.NATHANIEL.ordinal(), serializer.getValueToOrdinal().get(PublicEnum.NATHANIEL).intValue());
+		assertEquals(PublicEnum.EMMA.ordinal(), serializer.getValueToOrdinal().get(PublicEnum.EMMA).intValue());
+		assertEquals(PublicEnum.PAULA.ordinal(), serializer.getValueToOrdinal().get(PublicEnum.PAULA).intValue());
+		assertTrue(Arrays.equals(PublicEnum.values(), serializer.getValues()));
+	}
+
+	@Test
+	public void testSerializeEnumSerializer() throws Exception {
+		EnumSerializer<PublicEnum> serializer = new EnumSerializer<>(PublicEnum.class);
+
+		// verify original transient parameters
+		assertEquals(PublicEnum.FOO.ordinal(), serializer.getValueToOrdinal().get(PublicEnum.FOO).intValue());
+		assertEquals(PublicEnum.BAR.ordinal(), serializer.getValueToOrdinal().get(PublicEnum.BAR).intValue());
+		assertEquals(PublicEnum.PETER.ordinal(), serializer.getValueToOrdinal().get(PublicEnum.PETER).intValue());
+		assertEquals(PublicEnum.NATHANIEL.ordinal(), serializer.getValueToOrdinal().get(PublicEnum.NATHANIEL).intValue());
+		assertEquals(PublicEnum.EMMA.ordinal(), serializer.getValueToOrdinal().get(PublicEnum.EMMA).intValue());
+		assertEquals(PublicEnum.PAULA.ordinal(), serializer.getValueToOrdinal().get(PublicEnum.PAULA).intValue());
+		assertTrue(Arrays.equals(PublicEnum.values(), serializer.getValues()));
+
+		byte[] serializedSerializer = InstantiationUtil.serializeObject(serializer);
+
+		// deserialize and re-verify transient parameters
+		serializer = InstantiationUtil.deserializeObject(serializedSerializer, Thread.currentThread().getContextClassLoader());
+		assertEquals(PublicEnum.FOO.ordinal(), serializer.getValueToOrdinal().get(PublicEnum.FOO).intValue());
+		assertEquals(PublicEnum.BAR.ordinal(), serializer.getValueToOrdinal().get(PublicEnum.BAR).intValue());
+		assertEquals(PublicEnum.PETER.ordinal(), serializer.getValueToOrdinal().get(PublicEnum.PETER).intValue());
+		assertEquals(PublicEnum.NATHANIEL.ordinal(), serializer.getValueToOrdinal().get(PublicEnum.NATHANIEL).intValue());
+		assertEquals(PublicEnum.EMMA.ordinal(), serializer.getValueToOrdinal().get(PublicEnum.EMMA).intValue());
+		assertEquals(PublicEnum.PAULA.ordinal(), serializer.getValueToOrdinal().get(PublicEnum.PAULA).intValue());
+		assertTrue(Arrays.equals(PublicEnum.values(), serializer.getValues()));
+	}
+
+	@Test
+	public void testSerializeReconfiguredEnumSerializer() throws Exception {
+		// mock the previous ordering of enum constants to be BAR, PAULA, NATHANIEL
+		PublicEnum[] mockPreviousOrder = {PublicEnum.BAR, PublicEnum.PAULA, PublicEnum.NATHANIEL};
+
+		// now, the actual order of FOO, BAR, PETER, NATHANIEL, EMMA, PAULA will be the "new wrong order"
+		EnumSerializer<PublicEnum> serializer = new EnumSerializer<>(PublicEnum.class);
+
+		// verify that the serializer is first using the "wrong order" (i.e., the initial new configuration)
+		assertEquals(PublicEnum.FOO.ordinal(), serializer.getValueToOrdinal().get(PublicEnum.FOO).intValue());
+		assertEquals(PublicEnum.BAR.ordinal(), serializer.getValueToOrdinal().get(PublicEnum.BAR).intValue());
+		assertEquals(PublicEnum.PETER.ordinal(), serializer.getValueToOrdinal().get(PublicEnum.PETER).intValue());
+		assertEquals(PublicEnum.NATHANIEL.ordinal(), serializer.getValueToOrdinal().get(PublicEnum.NATHANIEL).intValue());
+		assertEquals(PublicEnum.EMMA.ordinal(), serializer.getValueToOrdinal().get(PublicEnum.EMMA).intValue());
+		assertEquals(PublicEnum.PAULA.ordinal(), serializer.getValueToOrdinal().get(PublicEnum.PAULA).intValue());
+
+		// reconfigure and verify compatibility
+		CompatibilityResult<PublicEnum> compatResult = serializer.ensureCompatibility(
+			new EnumSerializer.EnumSerializerConfigSnapshot<>(PublicEnum.class, mockPreviousOrder));
+		assertFalse(compatResult.requiresMigration());
+
+		// serialize and deserialize again the serializer
+		byte[] serializedSerializer = InstantiationUtil.serializeObject(serializer);
+		serializer = InstantiationUtil.deserializeObject(serializedSerializer, Thread.currentThread().getContextClassLoader());
+
+		// verify that after the serializer was read, the reconfigured constant ordering is untouched
+		PublicEnum[] expectedOrder = {PublicEnum.BAR, PublicEnum.PAULA, PublicEnum.NATHANIEL, PublicEnum.FOO, PublicEnum.PETER, PublicEnum.EMMA};
+
+		int i = 0;
+		for (PublicEnum constant : expectedOrder) {
+			assertEquals(i, serializer.getValueToOrdinal().get(constant).intValue());
+			i++;
+		}
+
+		assertTrue(Arrays.equals(expectedOrder, serializer.getValues()));
+	}
+
 	@SafeVarargs
 	public final <T extends Enum<T>> void testEnumSerializer(T... data) {
 		@SuppressWarnings("unchecked")

http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/array/CharPrimitiveArraySerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/array/CharPrimitiveArraySerializerTest.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/array/CharPrimitiveArraySerializerTest.java
index 9fda3d0..3301aa2 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/array/CharPrimitiveArraySerializerTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/array/CharPrimitiveArraySerializerTest.java
@@ -20,8 +20,6 @@ package org.apache.flink.api.common.typeutils.base.array;
 
 import org.apache.flink.api.common.typeutils.SerializerTestBase;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.base.array.CharPrimitiveArraySerializer;
-import org.apache.flink.api.common.typeutils.base.array.LongPrimitiveArraySerializer;
 
 /**
  * A test for the {@link LongPrimitiveArraySerializer}.