You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2017/05/07 19:47:02 UTC

[4/8] flink git commit: [FLINK-6178] [core] Allow serializer upgrades for managed state

http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerTest.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerTest.java
index 4712ed1..5459d53 100644
--- a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerTest.java
@@ -18,29 +18,43 @@
 
 package org.apache.flink.api.java.typeutils.runtime;
 
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.lang.reflect.Field;
 import java.util.ArrayList;
 import java.util.Date;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Objects;
 import java.util.Random;
 
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.CompatibilityResult;
 import org.apache.flink.api.common.typeutils.SerializerTestBase;
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.CompositeType.FlatFieldDescriptor;
 import org.apache.flink.api.common.operators.Keys.ExpressionKeys;
 import org.apache.flink.api.common.operators.Keys.IncompatibleKeysException;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializerUtil;
 import org.apache.flink.api.java.tuple.Tuple1;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.api.java.typeutils.PojoTypeInfo;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.junit.Assert;
 import org.junit.Test;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
 /**
  * A test for the {@link PojoSerializer}.
  */
@@ -191,6 +205,20 @@ public class PojoSerializerTest extends SerializerTestBase<PojoSerializerTest.Te
 			return true;
 		}
 	}
+
+	public static class SubTestUserClassA extends TestUserClass {
+		public int subDumm1;
+		public String subDumm2;
+
+		public SubTestUserClassA() {}
+	}
+
+	public static class SubTestUserClassB extends TestUserClass {
+		public Double subDumm1;
+		public float subDumm2;
+
+		public SubTestUserClassB() {}
+	}
 	
 	/**
 	 * This tests if the hashes returned by the pojo and tuple comparators are the same
@@ -240,4 +268,244 @@ public class PojoSerializerTest extends SerializerTestBase<PojoSerializerTest.Te
 		Assert.assertTrue("The hashing for tuples and pojos must be the same, so that they are mixable. Also for those with multiple key fields", multiPojoHash == multiTupleHash);
 		
 	}
-}	
+
+	// --------------------------------------------------------------------------------------------
+	// Configuration snapshotting & reconfiguring tests
+	// --------------------------------------------------------------------------------------------
+
+	/**
+	 * Verifies that reconfiguring with a config snapshot of a preceding POJO serializer
+	 * with different POJO type will result in INCOMPATIBLE.
+	 */
+	@Test
+	public void testReconfigureWithDifferentPojoType() throws Exception {
+		PojoSerializer<SubTestUserClassB> pojoSerializer1 = (PojoSerializer<SubTestUserClassB>)
+			TypeExtractor.getForClass(SubTestUserClassB.class).createSerializer(new ExecutionConfig());
+
+		// snapshot configuration and serialize to bytes
+		TypeSerializerConfigSnapshot pojoSerializerConfigSnapshot = pojoSerializer1.snapshotConfiguration();
+		byte[] serializedConfig;
+		try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
+			TypeSerializerUtil.writeSerializerConfigSnapshot(new DataOutputViewStreamWrapper(out), pojoSerializerConfigSnapshot);
+			serializedConfig = out.toByteArray();
+		}
+
+		PojoSerializer<SubTestUserClassA> pojoSerializer2 = (PojoSerializer<SubTestUserClassA>)
+			TypeExtractor.getForClass(SubTestUserClassA.class).createSerializer(new ExecutionConfig());
+
+		// read configuration again from bytes
+		try(ByteArrayInputStream in = new ByteArrayInputStream(serializedConfig)) {
+			pojoSerializerConfigSnapshot = TypeSerializerUtil.readSerializerConfigSnapshot(
+				new DataInputViewStreamWrapper(in), Thread.currentThread().getContextClassLoader());
+		}
+
+		CompatibilityResult<SubTestUserClassA> compatResult = pojoSerializer2.ensureCompatibility(pojoSerializerConfigSnapshot);
+		assertTrue(compatResult.requiresMigration());
+	}
+
+	/**
+	 * Tests that reconfiguration correctly reorders subclass registrations to their previous order.
+	 */
+	@Test
+	public void testReconfigureDifferentSubclassRegistrationOrder() throws Exception {
+		ExecutionConfig executionConfig = new ExecutionConfig();
+		executionConfig.registerPojoType(SubTestUserClassA.class);
+		executionConfig.registerPojoType(SubTestUserClassB.class);
+
+		PojoSerializer<TestUserClass> pojoSerializer = (PojoSerializer<TestUserClass>) type.createSerializer(executionConfig);
+
+		// get original registration ids
+		int subClassATag = pojoSerializer.getRegisteredClasses().get(SubTestUserClassA.class);
+		int subClassBTag = pojoSerializer.getRegisteredClasses().get(SubTestUserClassB.class);
+
+		// snapshot configuration and serialize to bytes
+		TypeSerializerConfigSnapshot pojoSerializerConfigSnapshot = pojoSerializer.snapshotConfiguration();
+		byte[] serializedConfig;
+		try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
+			TypeSerializerUtil.writeSerializerConfigSnapshot(new DataOutputViewStreamWrapper(out), pojoSerializerConfigSnapshot);
+			serializedConfig = out.toByteArray();
+		}
+
+		// use new config and instantiate new PojoSerializer
+		executionConfig = new ExecutionConfig();
+		executionConfig.registerPojoType(SubTestUserClassB.class); // test with B registered before A
+		executionConfig.registerPojoType(SubTestUserClassA.class);
+
+		pojoSerializer = (PojoSerializer<TestUserClass>) type.createSerializer(executionConfig);
+
+		// read configuration from bytes
+		try(ByteArrayInputStream in = new ByteArrayInputStream(serializedConfig)) {
+			pojoSerializerConfigSnapshot = TypeSerializerUtil.readSerializerConfigSnapshot(
+				new DataInputViewStreamWrapper(in), Thread.currentThread().getContextClassLoader());
+		}
+
+		CompatibilityResult<TestUserClass> compatResult = pojoSerializer.ensureCompatibility(pojoSerializerConfigSnapshot);
+		assertTrue(!compatResult.requiresMigration());
+
+		// reconfigure - check reconfiguration result and that registration ids remains the same
+		//assertEquals(ReconfigureResult.COMPATIBLE, pojoSerializer.reconfigure(pojoSerializerConfigSnapshot));
+		assertEquals(subClassATag, pojoSerializer.getRegisteredClasses().get(SubTestUserClassA.class).intValue());
+		assertEquals(subClassBTag, pojoSerializer.getRegisteredClasses().get(SubTestUserClassB.class).intValue());
+	}
+
+	/**
+	 * Tests that reconfiguration repopulates previously cached subclass serializers.
+	 */
+	@Test
+	public void testReconfigureRepopulateNonregisteredSubclassSerializerCache() throws Exception {
+		// don't register any subclasses
+		PojoSerializer<TestUserClass> pojoSerializer = (PojoSerializer<TestUserClass>) type.createSerializer(new ExecutionConfig());
+
+		// create cached serializers for SubTestUserClassA and SubTestUserClassB
+		pojoSerializer.getSubclassSerializer(SubTestUserClassA.class);
+		pojoSerializer.getSubclassSerializer(SubTestUserClassB.class);
+
+		assertEquals(2, pojoSerializer.getSubclassSerializerCache().size());
+		assertTrue(pojoSerializer.getSubclassSerializerCache().containsKey(SubTestUserClassA.class));
+		assertTrue(pojoSerializer.getSubclassSerializerCache().containsKey(SubTestUserClassB.class));
+
+		// snapshot configuration and serialize to bytes
+		TypeSerializerConfigSnapshot pojoSerializerConfigSnapshot = pojoSerializer.snapshotConfiguration();
+		byte[] serializedConfig;
+		try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
+			TypeSerializerUtil.writeSerializerConfigSnapshot(new DataOutputViewStreamWrapper(out), pojoSerializerConfigSnapshot);
+			serializedConfig = out.toByteArray();
+		}
+
+		// instantiate new PojoSerializer
+
+		pojoSerializer = (PojoSerializer<TestUserClass>) type.createSerializer(new ExecutionConfig());
+
+		// read configuration from bytes
+		try(ByteArrayInputStream in = new ByteArrayInputStream(serializedConfig)) {
+			pojoSerializerConfigSnapshot = TypeSerializerUtil.readSerializerConfigSnapshot(
+				new DataInputViewStreamWrapper(in), Thread.currentThread().getContextClassLoader());
+		}
+
+		// reconfigure - check reconfiguration result and that subclass serializer cache is repopulated
+		CompatibilityResult<TestUserClass> compatResult = pojoSerializer.ensureCompatibility(pojoSerializerConfigSnapshot);
+		assertFalse(compatResult.requiresMigration());
+		assertEquals(2, pojoSerializer.getSubclassSerializerCache().size());
+		assertTrue(pojoSerializer.getSubclassSerializerCache().containsKey(SubTestUserClassA.class));
+		assertTrue(pojoSerializer.getSubclassSerializerCache().containsKey(SubTestUserClassB.class));
+	}
+
+	/**
+	 * Tests that:
+	 *  - Previous Pojo serializer did not have registrations, and created cached serializers for subclasses
+	 *  - On restore, it had those subclasses registered
+	 *
+	 * In this case, after reconfiguration, the cache should be repopulated, and registrations should
+	 * also exist for the subclasses.
+	 *
+	 * Note: the cache still needs to be repopulated because previous data of those subclasses were
+	 * written with the cached serializers. In this case, the repopulated cache has reconfigured serializers
+	 * for the subclasses so that previous written data can be read, but the registered serializers
+	 * for the subclasses do not necessarily need to be reconfigured since they will only be used to
+	 * write new data.
+	 */
+	@Test
+	public void testReconfigureWithPreviouslyNonregisteredSubclasses() throws Exception {
+		// don't register any subclasses at first
+		PojoSerializer<TestUserClass> pojoSerializer = (PojoSerializer<TestUserClass>) type.createSerializer(new ExecutionConfig());
+
+		// create cached serializers for SubTestUserClassA and SubTestUserClassB
+		pojoSerializer.getSubclassSerializer(SubTestUserClassA.class);
+		pojoSerializer.getSubclassSerializer(SubTestUserClassB.class);
+
+		// make sure serializers are in cache
+		assertEquals(2, pojoSerializer.getSubclassSerializerCache().size());
+		assertTrue(pojoSerializer.getSubclassSerializerCache().containsKey(SubTestUserClassA.class));
+		assertTrue(pojoSerializer.getSubclassSerializerCache().containsKey(SubTestUserClassB.class));
+
+		// make sure that registrations are empty
+		assertTrue(pojoSerializer.getRegisteredClasses().isEmpty());
+		assertEquals(0, pojoSerializer.getRegisteredSerializers().length);
+
+		// snapshot configuration and serialize to bytes
+		TypeSerializerConfigSnapshot pojoSerializerConfigSnapshot = pojoSerializer.snapshotConfiguration();
+		byte[] serializedConfig;
+		try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
+			TypeSerializerUtil.writeSerializerConfigSnapshot(new DataOutputViewStreamWrapper(out), pojoSerializerConfigSnapshot);
+			serializedConfig = out.toByteArray();
+		}
+
+		// instantiate new PojoSerializer, with new execution config that has the subclass registrations
+		ExecutionConfig newExecutionConfig = new ExecutionConfig();
+		newExecutionConfig.registerPojoType(SubTestUserClassA.class);
+		newExecutionConfig.registerPojoType(SubTestUserClassB.class);
+		pojoSerializer = (PojoSerializer<TestUserClass>) type.createSerializer(newExecutionConfig);
+
+		// read configuration from bytes
+		try(ByteArrayInputStream in = new ByteArrayInputStream(serializedConfig)) {
+			pojoSerializerConfigSnapshot = TypeSerializerUtil.readSerializerConfigSnapshot(
+				new DataInputViewStreamWrapper(in), Thread.currentThread().getContextClassLoader());
+		}
+
+		// reconfigure - check reconfiguration result and that
+		// 1) subclass serializer cache is repopulated
+		// 2) registrations also contain the now registered subclasses
+		CompatibilityResult<TestUserClass> compatResult = pojoSerializer.ensureCompatibility(pojoSerializerConfigSnapshot);
+		assertFalse(compatResult.requiresMigration());
+		assertEquals(2, pojoSerializer.getSubclassSerializerCache().size());
+		assertTrue(pojoSerializer.getSubclassSerializerCache().containsKey(SubTestUserClassA.class));
+		assertTrue(pojoSerializer.getSubclassSerializerCache().containsKey(SubTestUserClassB.class));
+		assertEquals(2, pojoSerializer.getRegisteredClasses().size());
+		assertTrue(pojoSerializer.getRegisteredClasses().containsKey(SubTestUserClassA.class));
+		assertTrue(pojoSerializer.getRegisteredClasses().containsKey(SubTestUserClassB.class));
+	}
+
+	/**
+	 * Verifies that reconfiguration reorders the fields of the new Pojo serializer to remain the same.
+	 */
+	@Test
+	public void testReconfigureWithDifferentFieldOrder() throws Exception {
+		Field[] mockOriginalFieldOrder = {
+			TestUserClass.class.getField("dumm4"),
+			TestUserClass.class.getField("dumm3"),
+			TestUserClass.class.getField("nestedClass"),
+			TestUserClass.class.getField("dumm1"),
+			TestUserClass.class.getField("dumm2"),
+			TestUserClass.class.getField("dumm5"),
+		};
+
+		// creating this serializer just for generating config snapshots of the field serializers
+		PojoSerializer<TestUserClass> ser = (PojoSerializer<TestUserClass>) type.createSerializer(new ExecutionConfig());
+
+		LinkedHashMap<Field, TypeSerializerConfigSnapshot> mockOriginalFieldToSerializerConfigSnapshot =
+			new LinkedHashMap<>(mockOriginalFieldOrder.length);
+		mockOriginalFieldToSerializerConfigSnapshot.put(mockOriginalFieldOrder[0], ser.getFieldSerializers()[3].snapshotConfiguration());
+		mockOriginalFieldToSerializerConfigSnapshot.put(mockOriginalFieldOrder[1], ser.getFieldSerializers()[2].snapshotConfiguration());
+		mockOriginalFieldToSerializerConfigSnapshot.put(mockOriginalFieldOrder[2], ser.getFieldSerializers()[5].snapshotConfiguration());
+		mockOriginalFieldToSerializerConfigSnapshot.put(mockOriginalFieldOrder[3], ser.getFieldSerializers()[0].snapshotConfiguration());
+		mockOriginalFieldToSerializerConfigSnapshot.put(mockOriginalFieldOrder[4], ser.getFieldSerializers()[1].snapshotConfiguration());
+		mockOriginalFieldToSerializerConfigSnapshot.put(mockOriginalFieldOrder[5], ser.getFieldSerializers()[4].snapshotConfiguration());
+
+		PojoSerializer<TestUserClass> pojoSerializer = (PojoSerializer<TestUserClass>) type.createSerializer(new ExecutionConfig());
+
+		assertEquals(TestUserClass.class.getField("dumm1"), pojoSerializer.getFields()[0]);
+		assertEquals(TestUserClass.class.getField("dumm2"), pojoSerializer.getFields()[1]);
+		assertEquals(TestUserClass.class.getField("dumm3"), pojoSerializer.getFields()[2]);
+		assertEquals(TestUserClass.class.getField("dumm4"), pojoSerializer.getFields()[3]);
+		assertEquals(TestUserClass.class.getField("dumm5"), pojoSerializer.getFields()[4]);
+		assertEquals(TestUserClass.class.getField("nestedClass"), pojoSerializer.getFields()[5]);
+
+		PojoSerializer.PojoSerializerConfigSnapshot<TestUserClass> mockPreviousConfigSnapshot =
+			new PojoSerializer.PojoSerializerConfigSnapshot<>(
+				TestUserClass.class,
+				mockOriginalFieldToSerializerConfigSnapshot, // this mocks the previous field order
+				new LinkedHashMap<Class<?>, TypeSerializerConfigSnapshot>(), // empty; irrelevant for this test
+				new HashMap<Class<?>, TypeSerializerConfigSnapshot>()); // empty; irrelevant for this test
+
+		// reconfigure - check reconfiguration result and that fields are reordered to the previous order
+		CompatibilityResult<TestUserClass> compatResult = pojoSerializer.ensureCompatibility(
+
+			mockPreviousConfigSnapshot);
+		assertFalse(compatResult.requiresMigration());
+		int i = 0;
+		for (Field field : mockOriginalFieldOrder) {
+			assertEquals(field, pojoSerializer.getFields()[i]);
+			i++;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializerCompatibilityTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializerCompatibilityTest.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializerCompatibilityTest.java
new file mode 100644
index 0000000..60c4dc4
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializerCompatibilityTest.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime.kryo;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeutils.CompatibilityResult;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializerUtil;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.junit.Test;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests related to configuration snapshotting and reconfiguring for the {@link KryoSerializer}.
+ */
+public class KryoSerializerCompatibilityTest {
+
+	/**
+	 * Verifies that reconfiguration result is INCOMPATIBLE if data type has changed.
+	 */
+	@Test
+	public void testMigrationStrategyWithDifferentKryoType() throws Exception {
+		KryoSerializer<TestClassA> kryoSerializerForA = new KryoSerializer<>(TestClassA.class, new ExecutionConfig());
+
+		// snapshot configuration and serialize to bytes
+		TypeSerializerConfigSnapshot kryoSerializerConfigSnapshot = kryoSerializerForA.snapshotConfiguration();
+		byte[] serializedConfig;
+		try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
+			TypeSerializerUtil.writeSerializerConfigSnapshot(new DataOutputViewStreamWrapper(out), kryoSerializerConfigSnapshot);
+			serializedConfig = out.toByteArray();
+		}
+
+		KryoSerializer<TestClassB> kryoSerializerForB = new KryoSerializer<>(TestClassB.class, new ExecutionConfig());
+
+		// read configuration again from bytes
+		try(ByteArrayInputStream in = new ByteArrayInputStream(serializedConfig)) {
+			kryoSerializerConfigSnapshot = TypeSerializerUtil.readSerializerConfigSnapshot(
+				new DataInputViewStreamWrapper(in), Thread.currentThread().getContextClassLoader());
+		}
+
+		CompatibilityResult<TestClassB> compatResult = kryoSerializerForB.ensureCompatibility(kryoSerializerConfigSnapshot);
+		assertTrue(compatResult.requiresMigration());
+	}
+
+	/**
+	 * Tests that after reconfiguration, registration ids are reconfigured to
+	 * remain the same as the preceding KryoSerializer.
+	 */
+	@Test
+	public void testMigrationStrategyForDifferentRegistrationOrder() throws Exception {
+
+		ExecutionConfig executionConfig = new ExecutionConfig();
+		executionConfig.registerKryoType(TestClassA.class);
+		executionConfig.registerKryoType(TestClassB.class);
+
+		KryoSerializer<TestClass> kryoSerializer = new KryoSerializer<>(TestClass.class, executionConfig);
+
+		// get original registration ids
+		int testClassId = kryoSerializer.getKryo().getRegistration(TestClass.class).getId();
+		int testClassAId = kryoSerializer.getKryo().getRegistration(TestClassA.class).getId();
+		int testClassBId = kryoSerializer.getKryo().getRegistration(TestClassB.class).getId();
+
+		// snapshot configuration and serialize to bytes
+		TypeSerializerConfigSnapshot kryoSerializerConfigSnapshot = kryoSerializer.snapshotConfiguration();
+		byte[] serializedConfig;
+		try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
+			TypeSerializerUtil.writeSerializerConfigSnapshot(new DataOutputViewStreamWrapper(out), kryoSerializerConfigSnapshot);
+			serializedConfig = out.toByteArray();
+		}
+
+		// use new config and instantiate new KryoSerializer
+		executionConfig = new ExecutionConfig();
+		executionConfig.registerKryoType(TestClassB.class); // test with B registered before A
+		executionConfig.registerKryoType(TestClassA.class);
+
+		kryoSerializer = new KryoSerializer<>(TestClass.class, executionConfig);
+
+		// read configuration from bytes
+		try(ByteArrayInputStream in = new ByteArrayInputStream(serializedConfig)) {
+			kryoSerializerConfigSnapshot = TypeSerializerUtil.readSerializerConfigSnapshot(
+				new DataInputViewStreamWrapper(in), Thread.currentThread().getContextClassLoader());
+		}
+
+		// reconfigure - check reconfiguration result and that registration id remains the same
+		CompatibilityResult<TestClass> compatResult = kryoSerializer.ensureCompatibility(kryoSerializerConfigSnapshot);
+		assertFalse(compatResult.requiresMigration());
+		assertEquals(testClassId, kryoSerializer.getKryo().getRegistration(TestClass.class).getId());
+		assertEquals(testClassAId, kryoSerializer.getKryo().getRegistration(TestClassA.class).getId());
+		assertEquals(testClassBId, kryoSerializer.getKryo().getRegistration(TestClassB.class).getId());
+	}
+
+	private static class TestClass {}
+
+	private static class TestClassA {}
+
+	private static class TestClassB {}
+
+	private static class TestClassBSerializer extends Serializer {
+		@Override
+		public void write(Kryo kryo, Output output, Object o) {
+			throw new UnsupportedOperationException();
+		}
+
+		@Override
+		public Object read(Kryo kryo, Input input, Class aClass) {
+			throw new UnsupportedOperationException();
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-java/src/test/java/org/apache/flink/api/java/io/CollectionInputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/io/CollectionInputFormatTest.java b/flink-java/src/test/java/org/apache/flink/api/java/io/CollectionInputFormatTest.java
index 3bb40eb..4dabaca 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/io/CollectionInputFormatTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/io/CollectionInputFormatTest.java
@@ -26,7 +26,9 @@ import static org.junit.Assert.fail;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.CompatibilityResult;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.core.io.GenericInputSplit;
 import org.apache.flink.core.memory.DataInputView;
@@ -386,5 +388,15 @@ public class CollectionInputFormatTest {
 		public int hashCode() {
 			return Objects.hash(failOnRead, failOnWrite);
 		}
+
+		@Override
+		public TypeSerializerConfigSnapshot snapshotConfiguration() {
+			throw new UnsupportedOperationException();
+		}
+
+		@Override
+		public CompatibilityResult<ElementType> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
+			throw new UnsupportedOperationException();
+		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/NonDuplicatingTypeSerializer.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/NonDuplicatingTypeSerializer.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/NonDuplicatingTypeSerializer.java
index 846b6c3..9e22fc2 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/NonDuplicatingTypeSerializer.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/NonDuplicatingTypeSerializer.java
@@ -18,7 +18,10 @@
 
 package org.apache.flink.cep;
 
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.CompatibilityResult;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 
@@ -39,7 +42,8 @@ import java.util.IdentityHashMap;
  *
  * @param <T> Type of the element to be serialized
  */
-public class NonDuplicatingTypeSerializer<T> extends TypeSerializer<T> {
+@Internal
+public final class NonDuplicatingTypeSerializer<T> extends TypeSerializer<T> {
 	private static final long serialVersionUID = -7633631762221447524L;
 
 	// underlying type serializer
@@ -192,4 +196,14 @@ public class NonDuplicatingTypeSerializer<T> extends TypeSerializer<T> {
 		this.identityMap = new IdentityHashMap<>();
 		this.elementList = new ArrayList<>();
 	}
+
+	@Override
+	public TypeSerializerConfigSnapshot snapshotConfiguration() {
+		throw new UnsupportedOperationException("This serializer is not registered for managed state.");
+	}
+
+	@Override
+	public CompatibilityResult<T> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
+		throw new UnsupportedOperationException("This serializer is not registered for managed state.");
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
index b8c4e65..70755e5 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
@@ -21,6 +21,22 @@ package org.apache.flink.cep.nfa;
 import com.google.common.base.Predicate;
 import com.google.common.collect.Iterators;
 import com.google.common.collect.LinkedHashMultimap;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.runtime.DataInputViewStream;
+import org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream;
+import org.apache.flink.cep.NonDuplicatingTypeSerializer;
+import org.apache.flink.cep.nfa.compiler.NFACompiler;
+import org.apache.flink.cep.pattern.conditions.IterativeCondition;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
@@ -43,20 +59,6 @@ import java.util.Set;
 import java.util.Stack;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
-import javax.annotation.Nullable;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.runtime.DataInputViewStream;
-import org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream;
-import org.apache.flink.cep.NonDuplicatingTypeSerializer;
-import org.apache.flink.cep.nfa.compiler.NFACompiler;
-import org.apache.flink.cep.pattern.conditions.IterativeCondition;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataInputViewStreamWrapper;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
-import org.apache.flink.streaming.api.windowing.time.Time;
-import org.apache.flink.util.Preconditions;
 
 /**
  * Non-deterministic finite automaton implementation.
@@ -859,7 +861,7 @@ public class NFA<T> implements Serializable {
 	/**
 	 * A {@link TypeSerializer} for {@link NFA} that uses Java Serialization.
 	 */
-	public static class Serializer<T> extends TypeSerializer<NFA<T>> {
+	public static class Serializer<T> extends TypeSerializerSingleton<NFA<T>> {
 
 		private static final long serialVersionUID = 1L;
 
@@ -869,11 +871,6 @@ public class NFA<T> implements Serializable {
 		}
 
 		@Override
-		public TypeSerializer<NFA<T>> duplicate() {
-			return this;
-		}
-
-		@Override
 		public NFA<T> createInstance() {
 			return null;
 		}
@@ -944,18 +941,8 @@ public class NFA<T> implements Serializable {
 		}
 
 		@Override
-		public boolean equals(Object obj) {
-			return obj instanceof Serializer && ((Serializer) obj).canEqual(this);
-		}
-
-		@Override
 		public boolean canEqual(Object obj) {
 			return obj instanceof Serializer;
 		}
-
-		@Override
-		public int hashCode() {
-			return getClass().hashCode();
-		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
index b6374cd..14235dc 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
@@ -21,7 +21,10 @@ package org.apache.flink.cep.operator;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.state.ValueState;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.CompatibilityResult;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.base.CollectionSerializerConfigSnapshot;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.cep.nfa.NFA;
 import org.apache.flink.cep.nfa.compiler.NFACompiler;
@@ -473,11 +476,6 @@ public abstract class AbstractKeyedCEPPatternOperator<IN, KEY, OUT>
 		}
 
 		@Override
-		public boolean canRestoreFrom(TypeSerializer<?> other) {
-			return equals(other) || other instanceof AbstractKeyedCEPPatternOperator.PriorityQueueSerializer;
-		}
-
-		@Override
 		public boolean equals(Object obj) {
 			if (obj instanceof PriorityQueueSerializer) {
 				@SuppressWarnings("unchecked")
@@ -498,6 +496,32 @@ public abstract class AbstractKeyedCEPPatternOperator<IN, KEY, OUT>
 		public int hashCode() {
 			return Objects.hash(factory, elementSerializer);
 		}
+
+		// --------------------------------------------------------------------------------------------
+		// Serializer configuration snapshotting & compatibility
+		// --------------------------------------------------------------------------------------------
+
+		@Override
+		public TypeSerializerConfigSnapshot snapshotConfiguration() {
+			return new CollectionSerializerConfigSnapshot(elementSerializer.snapshotConfiguration());
+		}
+
+		@Override
+		public CompatibilityResult<PriorityQueue<T>> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
+			if (configSnapshot instanceof CollectionSerializerConfigSnapshot) {
+				CompatibilityResult<T> compatResult = elementSerializer.ensureCompatibility(
+						((CollectionSerializerConfigSnapshot) configSnapshot).getSingleNestedSerializerConfigSnapshot());
+
+				if (!compatResult.requiresMigration()) {
+					return CompatibilityResult.compatible();
+				} else if (compatResult.getConvertDeserializer() != null) {
+					return CompatibilityResult.requiresMigration(
+						new PriorityQueueSerializer<>(compatResult.getConvertDeserializer(), factory));
+				}
+			}
+
+			return CompatibilityResult.requiresMigration(null);
+		}
 	}
 
 	private interface PriorityQueueFactory<T> extends Serializable {

http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/IntValueArraySerializer.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/IntValueArraySerializer.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/IntValueArraySerializer.java
index b86fe87..5984122 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/IntValueArraySerializer.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/IntValueArraySerializer.java
@@ -19,6 +19,7 @@
 package org.apache.flink.graph.types.valuearray;
 
 import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
+import org.apache.flink.api.common.typeutils.base.array.IntPrimitiveArraySerializer;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 
@@ -82,4 +83,10 @@ public final class IntValueArraySerializer extends TypeSerializerSingleton<IntVa
 	public boolean canEqual(Object obj) {
 		return obj instanceof IntValueArraySerializer;
 	}
+
+	@Override
+	protected boolean isCompatibleSerializationFormatIdentifier(String identifier) {
+		return super.isCompatibleSerializationFormatIdentifier(identifier)
+			|| identifier.equals(IntPrimitiveArraySerializer.class.getCanonicalName());
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/LongValueArraySerializer.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/LongValueArraySerializer.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/LongValueArraySerializer.java
index 95219b6..e95a1a7 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/LongValueArraySerializer.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/LongValueArraySerializer.java
@@ -19,6 +19,7 @@
 package org.apache.flink.graph.types.valuearray;
 
 import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
+import org.apache.flink.api.common.typeutils.base.array.LongPrimitiveArraySerializer;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 
@@ -82,4 +83,10 @@ public final class LongValueArraySerializer extends TypeSerializerSingleton<Long
 	public boolean canEqual(Object obj) {
 		return obj instanceof LongValueArraySerializer;
 	}
+
+	@Override
+	protected boolean isCompatibleSerializationFormatIdentifier(String identifier) {
+		return super.isCompatibleSerializationFormatIdentifier(identifier)
+				|| identifier.equals(LongPrimitiveArraySerializer.class.getCanonicalName());
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/StringValueArraySerializer.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/StringValueArraySerializer.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/StringValueArraySerializer.java
index 0e875e3..6dbe0e5 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/StringValueArraySerializer.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/StringValueArraySerializer.java
@@ -19,6 +19,7 @@
 package org.apache.flink.graph.types.valuearray;
 
 import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
+import org.apache.flink.api.common.typeutils.base.array.StringArraySerializer;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 
@@ -82,4 +83,10 @@ public final class StringValueArraySerializer extends TypeSerializerSingleton<St
 	public boolean canEqual(Object obj) {
 		return obj instanceof StringValueArraySerializer;
 	}
+
+	@Override
+	protected boolean isCompatibleSerializationFormatIdentifier(String identifier) {
+		return super.isCompatibleSerializationFormatIdentifier(identifier)
+			|| identifier.equals(StringArraySerializer.class.getCanonicalName());
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/types/CRowSerializer.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/types/CRowSerializer.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/types/CRowSerializer.scala
index 1f56a98..7ffa57c 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/types/CRowSerializer.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/types/CRowSerializer.scala
@@ -18,7 +18,7 @@
 
 package org.apache.flink.table.runtime.types
 
-import org.apache.flink.api.common.typeutils.TypeSerializer
+import org.apache.flink.api.common.typeutils._
 import org.apache.flink.core.memory.{DataInputView, DataOutputView}
 import org.apache.flink.types.Row
 
@@ -75,4 +75,55 @@ class CRowSerializer(val rowSerializer: TypeSerializer[Row]) extends TypeSeriali
   }
 
   override def hashCode: Int = rowSerializer.hashCode() * 13
+
+  // --------------------------------------------------------------------------------------------
+  // Serializer configuration snapshotting & compatibility
+  // --------------------------------------------------------------------------------------------
+
+  override def snapshotConfiguration(): TypeSerializerConfigSnapshot = {
+    new CRowSerializer.CRowSerializerConfigSnapshot(
+      rowSerializer.snapshotConfiguration())
+  }
+
+  override def ensureCompatibility(
+      configSnapshot: TypeSerializerConfigSnapshot): CompatibilityResult[CRow] = {
+
+    configSnapshot match {
+      case crowSerializerConfigSnapshot: CRowSerializer.CRowSerializerConfigSnapshot =>
+        val compatResult = rowSerializer.ensureCompatibility(
+            crowSerializerConfigSnapshot.getSingleNestedSerializerConfigSnapshot)
+
+        if (compatResult.requiresMigration()) {
+          if (compatResult.getConvertDeserializer != null) {
+            CompatibilityResult.requiresMigration(
+              new CRowSerializer(compatResult.getConvertDeserializer)
+            )
+          } else {
+            CompatibilityResult.requiresMigration(null)
+          }
+        } else {
+          CompatibilityResult.compatible()
+        }
+
+      case _ => CompatibilityResult.requiresMigration(null)
+    }
+  }
+}
+
+object CRowSerializer {
+
+  class CRowSerializerConfigSnapshot(
+      private var rowSerializerConfigSnapshot: TypeSerializerConfigSnapshot)
+    extends CompositeTypeSerializerConfigSnapshot(rowSerializerConfigSnapshot) {
+
+    /** This empty nullary constructor is required for deserializing the configuration. */
+    def this() = this(null)
+
+    override def getVersion: Int = CRowSerializerConfigSnapshot.VERSION
+  }
+
+  object CRowSerializerConfigSnapshot {
+    val VERSION = 1
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-runtime/src/main/java/org/apache/flink/migration/MigrationNamespaceSerializerProxy.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/migration/MigrationNamespaceSerializerProxy.java b/flink-runtime/src/main/java/org/apache/flink/migration/MigrationNamespaceSerializerProxy.java
index c6813b6..c4e23ca 100644
--- a/flink-runtime/src/main/java/org/apache/flink/migration/MigrationNamespaceSerializerProxy.java
+++ b/flink-runtime/src/main/java/org/apache/flink/migration/MigrationNamespaceSerializerProxy.java
@@ -18,7 +18,10 @@
 
 package org.apache.flink.migration;
 
+import org.apache.flink.api.common.typeutils.CompatibilityResult;
+import org.apache.flink.api.common.typeutils.ParameterlessTypeSerializerConfig;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 
@@ -51,8 +54,7 @@ public class MigrationNamespaceSerializerProxy extends TypeSerializer<Serializab
 
 	@Override
 	public TypeSerializer<Serializable> duplicate() {
-		throw new UnsupportedOperationException(
-				"This is just a proxy used during migration until the real type serializer is provided by the user.");
+		return this;
 	}
 
 	@Override
@@ -103,6 +105,17 @@ public class MigrationNamespaceSerializerProxy extends TypeSerializer<Serializab
 	}
 
 	@Override
+	public TypeSerializerConfigSnapshot snapshotConfiguration() {
+		return new ParameterlessTypeSerializerConfig(getClass().getCanonicalName());
+	}
+
+	@Override
+	public CompatibilityResult<Serializable> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
+		// always assume compatibility since we're just a proxy for migration
+		return CompatibilityResult.compatible();
+	}
+
+	@Override
 	public boolean equals(Object obj) {
 		return obj instanceof MigrationNamespaceSerializerProxy;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/memory/AbstractMigrationRestoreStrategy.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/memory/AbstractMigrationRestoreStrategy.java b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/memory/AbstractMigrationRestoreStrategy.java
index f47989a..f58070e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/memory/AbstractMigrationRestoreStrategy.java
+++ b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/memory/AbstractMigrationRestoreStrategy.java
@@ -23,7 +23,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.base.VoidSerializer;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.runtime.state.KeyGroupRange;
-import org.apache.flink.runtime.state.RegisteredBackendStateMetaInfo;
+import org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo;
 import org.apache.flink.runtime.state.VoidNamespace;
 import org.apache.flink.runtime.state.VoidNamespaceSerializer;
 import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend;
@@ -85,14 +85,14 @@ public abstract class AbstractMigrationRestoreStrategy<K, N, S> implements Migra
 			patchedNamespaceSerializer = (TypeSerializer<N>) VoidNamespaceSerializer.INSTANCE;
 		}
 
-		RegisteredBackendStateMetaInfo<N, S> registeredBackendStateMetaInfo =
-				new RegisteredBackendStateMetaInfo<>(
+		RegisteredKeyedBackendStateMetaInfo<N, S> registeredKeyedBackendStateMetaInfo =
+				new RegisteredKeyedBackendStateMetaInfo<>(
 						StateDescriptor.Type.UNKNOWN,
 						stateName,
 						patchedNamespaceSerializer,
 						stateSerializer);
 
-		final StateTable<K, N, S> stateTable = stateBackend.newStateTable(registeredBackendStateMetaInfo);
+		final StateTable<K, N, S> stateTable = stateBackend.newStateTable(registeredKeyedBackendStateMetaInfo);
 		final DataInputView inView = openDataInputView();
 		final int keyGroup = keyGroupRange.getStartKeyGroup();
 		final int numNamespaces = inView.readInt();

http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-runtime/src/main/java/org/apache/flink/runtime/state/ArrayListSerializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ArrayListSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ArrayListSerializer.java
index 0badb41..8fbc227 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ArrayListSerializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ArrayListSerializer.java
@@ -17,7 +17,10 @@
  */
 package org.apache.flink.runtime.state;
 
+import org.apache.flink.api.common.typeutils.CompatibilityResult;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.base.CollectionSerializerConfigSnapshot;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 
@@ -132,4 +135,30 @@ final public class ArrayListSerializer<T> extends TypeSerializer<ArrayList<T>> {
 	public int hashCode() {
 		return elementSerializer.hashCode();
 	}
-}
\ No newline at end of file
+
+	// --------------------------------------------------------------------------------------------
+	// Serializer configuration snapshotting & compatibility
+	// --------------------------------------------------------------------------------------------
+
+	@Override
+	public TypeSerializerConfigSnapshot snapshotConfiguration() {
+		return new CollectionSerializerConfigSnapshot(elementSerializer.snapshotConfiguration());
+	}
+
+	@Override
+	public CompatibilityResult<ArrayList<T>> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
+		if (configSnapshot instanceof CollectionSerializerConfigSnapshot) {
+			CompatibilityResult<T> compatResult = elementSerializer.ensureCompatibility(
+				((CollectionSerializerConfigSnapshot) configSnapshot).getSingleNestedSerializerConfigSnapshot());
+
+			if (!compatResult.requiresMigration()) {
+				return CompatibilityResult.compatible();
+			} else if (compatResult.getConvertDeserializer() != null) {
+				return CompatibilityResult.requiresMigration(
+					new ArrayListSerializer<>(compatResult.getConvertDeserializer()));
+			}
+		}
+
+		return CompatibilityResult.requiresMigration(null);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
index e7ed26f..ec4aa81 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
@@ -207,26 +207,18 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend {
 					final Map<String, OperatorStateHandle.StateMetaInfo> writtenStatesMetaData =
 						new HashMap<>(registeredStatesDeepCopies.size());
 
-					List<OperatorBackendSerializationProxy.StateMetaInfo<?>> metaInfoList =
+					List<RegisteredOperatorBackendStateMetaInfo.Snapshot<?>> metaInfoSnapshots =
 						new ArrayList<>(registeredStatesDeepCopies.size());
 
-					for (Map.Entry<String, PartitionableListState<?>> entry :
-						registeredStatesDeepCopies.entrySet()) {
-
-						PartitionableListState<?> state = entry.getValue();
-						OperatorBackendSerializationProxy.StateMetaInfo<?> metaInfo =
-							new OperatorBackendSerializationProxy.StateMetaInfo<>(
-								state.getName(),
-								state.getPartitionStateSerializer(),
-								state.getAssignmentMode());
-						metaInfoList.add(metaInfo);
+					for (Map.Entry<String, PartitionableListState<?>> entry : registeredStatesDeepCopies.entrySet()) {
+						metaInfoSnapshots.add(entry.getValue().getStateMetaInfo().snapshot());
 					}
 
 					CheckpointStreamFactory.CheckpointStateOutputStream out = getIoHandle();
 					DataOutputView dov = new DataOutputViewStreamWrapper(out);
 
 					OperatorBackendSerializationProxy backendSerializationProxy =
-						new OperatorBackendSerializationProxy(metaInfoList);
+						new OperatorBackendSerializationProxy(metaInfoSnapshots);
 
 					backendSerializationProxy.write(dov);
 
@@ -237,7 +229,7 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend {
 
 						PartitionableListState<?> value = entry.getValue();
 						long[] partitionOffsets = value.write(out);
-						OperatorStateHandle.Mode mode = value.getAssignmentMode();
+						OperatorStateHandle.Mode mode = value.getStateMetaInfo().getAssignmentMode();
 						writtenStatesMetaData.put(
 							entry.getKey(),
 							new OperatorStateHandle.StateMetaInfo(partitionOffsets, mode));
@@ -254,10 +246,7 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend {
 						return null;
 					}
 
-					OperatorStateHandle operatorStateHandle =
-						new OperatorStateHandle(writtenStatesMetaData, stateHandle);
-
-					return operatorStateHandle;
+					return new OperatorStateHandle(writtenStatesMetaData, stateHandle);
 				}
 			};
 
@@ -298,25 +287,23 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend {
 
 				backendSerializationProxy.read(new DataInputViewStreamWrapper(in));
 
-				List<OperatorBackendSerializationProxy.StateMetaInfo<?>> metaInfoList =
-						backendSerializationProxy.getNamedStateSerializationProxies();
+				List<RegisteredOperatorBackendStateMetaInfo.Snapshot<?>> restoredMetaInfoSnapshots =
+						backendSerializationProxy.getStateMetaInfoSnapshots();
 
 				// Recreate all PartitionableListStates from the meta info
-				for (OperatorBackendSerializationProxy.StateMetaInfo<?> stateMetaInfo : metaInfoList) {
-					PartitionableListState<?> listState = registeredStates.get(stateMetaInfo.getName());
+				for (RegisteredOperatorBackendStateMetaInfo.Snapshot<?> restoredMetaInfo : restoredMetaInfoSnapshots) {
+					PartitionableListState<?> listState = registeredStates.get(restoredMetaInfo.getName());
 
 					if (null == listState) {
 						listState = new PartitionableListState<>(
-								stateMetaInfo.getName(),
-								stateMetaInfo.getStateSerializer(),
-								stateMetaInfo.getMode());
+								new RegisteredOperatorBackendStateMetaInfo<>(
+										restoredMetaInfo.getName(),
+										restoredMetaInfo.getPartitionStateSerializer(),
+										restoredMetaInfo.getAssignmentMode()));
 
-						registeredStates.put(listState.getName(), listState);
+						registeredStates.put(listState.getStateMetaInfo().getName(), listState);
 					} else {
-						Preconditions.checkState(listState.getPartitionStateSerializer().canRestoreFrom(
-								stateMetaInfo.getStateSerializer()), "Incompatible state serializers found: " +
-								listState.getPartitionStateSerializer() + " is not compatible with " +
-								stateMetaInfo.getStateSerializer());
+						// TODO with eager state registration in place, check here for serializer migration strategies
 					}
 				}
 
@@ -341,7 +328,6 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend {
 	}
 
 	/**
-	 *
 	 * Implementation of operator list state.
 	 *
 	 * @param <S> the type of an operator state partition.
@@ -349,19 +335,9 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend {
 	static final class PartitionableListState<S> implements ListState<S> {
 
 		/**
-		 * The name of the state, as registered by the user
-		 */
-		private final String name;
-
-		/**
-		 * The type serializer for the elements in the state list
-		 */
-		private final TypeSerializer<S> partitionStateSerializer;
-
-		/**
-		 * The mode how elements in this state are assigned to tasks during restore
+		 * Meta information of the state, including state name, assignment mode, and serializer
 		 */
-		private final OperatorStateHandle.Mode assignmentMode;
+		private final RegisteredOperatorBackendStateMetaInfo<S> stateMetaInfo;
 
 		/**
 		 * The internal list the holds the elements of the state
@@ -373,46 +349,26 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend {
 		 */
 		private final ArrayListSerializer<S> internalListCopySerializer;
 
-		public PartitionableListState(
-				String name,
-				TypeSerializer<S> partitionStateSerializer,
-				OperatorStateHandle.Mode assignmentMode) {
-
-			this(name, partitionStateSerializer, assignmentMode, new ArrayList<S>());
+		public PartitionableListState(RegisteredOperatorBackendStateMetaInfo<S> stateMetaInfo) {
+			this(stateMetaInfo, new ArrayList<S>());
 		}
 
 		private PartitionableListState(
-				String name,
-				TypeSerializer<S> partitionStateSerializer,
-				OperatorStateHandle.Mode assignmentMode,
+				RegisteredOperatorBackendStateMetaInfo<S> stateMetaInfo,
 				ArrayList<S> internalList) {
 
-			this.name = Preconditions.checkNotNull(name);
-			this.partitionStateSerializer = Preconditions.checkNotNull(partitionStateSerializer);
-			this.assignmentMode = Preconditions.checkNotNull(assignmentMode);
+			this.stateMetaInfo = Preconditions.checkNotNull(stateMetaInfo);
 			this.internalList = Preconditions.checkNotNull(internalList);
-			this.internalListCopySerializer = new ArrayListSerializer<>(partitionStateSerializer);
+			this.internalListCopySerializer = new ArrayListSerializer<>(stateMetaInfo.getPartitionStateSerializer());
 		}
 
 		private PartitionableListState(PartitionableListState<S> toCopy) {
 
-			this(
-					toCopy.name,
-					toCopy.partitionStateSerializer.duplicate(),
-					toCopy.assignmentMode,
-					toCopy.internalListCopySerializer.copy(toCopy.internalList));
-		}
-
-		public String getName() {
-			return name;
-		}
-
-		public OperatorStateHandle.Mode getAssignmentMode() {
-			return assignmentMode;
+			this(toCopy.stateMetaInfo, toCopy.internalListCopySerializer.copy(toCopy.internalList));
 		}
 
-		public TypeSerializer<S> getPartitionStateSerializer() {
-			return partitionStateSerializer;
+		public RegisteredOperatorBackendStateMetaInfo<S> getStateMetaInfo() {
+			return stateMetaInfo;
 		}
 
 		public List<S> getInternalList() {
@@ -441,8 +397,7 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend {
 		@Override
 		public String toString() {
 			return "PartitionableListState{" +
-					"name='" + name + '\'' +
-					", assignmentMode=" + assignmentMode +
+					"stateMetaInfo=" + stateMetaInfo +
 					", internalList=" + internalList +
 					'}';
 		}
@@ -456,7 +411,7 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend {
 			for (int i = 0; i < internalList.size(); ++i) {
 				S element = internalList.get(i);
 				partitionOffsets[i] = out.getPos();
-				partitionStateSerializer.serialize(element, dov);
+				getStateMetaInfo().getPartitionStateSerializer().serialize(element, dov);
 			}
 
 			return partitionOffsets;
@@ -466,7 +421,6 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend {
 	private <S> ListState<S> getListState(
 		ListStateDescriptor<S> stateDescriptor,
 		OperatorStateHandle.Mode mode) throws IOException {
-
 		Preconditions.checkNotNull(stateDescriptor);
 
 		stateDescriptor.initializeSerializerUnlessSet(getExecutionConfig());
@@ -478,23 +432,27 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend {
 		PartitionableListState<S> partitionableListState = (PartitionableListState<S>) registeredStates.get(name);
 
 		if (null == partitionableListState) {
-
 			partitionableListState = new PartitionableListState<>(
-				name,
-				partitionStateSerializer,
-				mode);
+				new RegisteredOperatorBackendStateMetaInfo<>(
+					name,
+					partitionStateSerializer,
+					mode));
 
 			registeredStates.put(name, partitionableListState);
 		} else {
+			// TODO with eager registration in place, these checks should be moved to restore()
+
 			Preconditions.checkState(
-				partitionableListState.getAssignmentMode().equals(mode),
-				"Incompatible assignment mode. Provided: " + mode + ", expected: " +
-					partitionableListState.getAssignmentMode());
+				partitionableListState.getStateMetaInfo().getName().equals(name),
+				"Incompatible state names. " +
+					"Was [" + partitionableListState.getStateMetaInfo().getName() + "], " +
+					"registered with [" + name + "].");
+
 			Preconditions.checkState(
-				stateDescriptor.getElementSerializer().
-					canRestoreFrom(partitionableListState.getPartitionStateSerializer()),
-				"Incompatible type serializers. Provided: " + stateDescriptor.getElementSerializer() +
-					", found: " + partitionableListState.getPartitionStateSerializer());
+				partitionableListState.getStateMetaInfo().getAssignmentMode().equals(mode),
+				"Incompatible state assignment modes. " +
+					"Was [" + partitionableListState.getStateMetaInfo().getAssignmentMode() + "], " +
+					"registered with [" + mode + "].");
 		}
 
 		return partitionableListState;
@@ -509,7 +467,7 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend {
 			long[] offsets = metaInfo.getOffsets();
 			if (null != offsets) {
 				DataInputView div = new DataInputViewStreamWrapper(in);
-				TypeSerializer<S> serializer = stateListForName.getPartitionStateSerializer();
+				TypeSerializer<S> serializer = stateListForName.getStateMetaInfo().getPartitionStateSerializer();
 				for (long offset : offsets) {
 					in.seek(offset);
 					stateListForName.add(serializer.deserialize(div));

http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-runtime/src/main/java/org/apache/flink/runtime/state/HashMapSerializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/HashMapSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/HashMapSerializer.java
index 61cc58c..d52c207 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/HashMapSerializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/HashMapSerializer.java
@@ -18,7 +18,11 @@
 
 package org.apache.flink.runtime.state;
 
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.CompatibilityResult;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.base.MapSerializerConfigSnapshot;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.util.Preconditions;
@@ -38,7 +42,8 @@ import java.util.Map;
  * @param <K> The type of the keys in the map.
  * @param <V> The type of the values in the map.
  */
-public class HashMapSerializer<K, V> extends TypeSerializer<HashMap<K, V>> {
+@Internal
+public final class HashMapSerializer<K, V> extends TypeSerializer<HashMap<K, V>> {
 
 	private static final long serialVersionUID = -6885593032367050078L;
 	
@@ -190,4 +195,37 @@ public class HashMapSerializer<K, V> extends TypeSerializer<HashMap<K, V>> {
 	public int hashCode() {
 		return keySerializer.hashCode() * 31 + valueSerializer.hashCode();
 	}
+
+	// --------------------------------------------------------------------------------------------
+	// Serializer configuration snapshotting & compatibility
+	// --------------------------------------------------------------------------------------------
+
+	@Override
+	public TypeSerializerConfigSnapshot snapshotConfiguration() {
+		return new MapSerializerConfigSnapshot(
+				keySerializer.snapshotConfiguration(),
+				valueSerializer.snapshotConfiguration());
+	}
+
+	@Override
+	public CompatibilityResult<HashMap<K, V>> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
+		if (configSnapshot instanceof MapSerializerConfigSnapshot) {
+			TypeSerializerConfigSnapshot[] keyValueSerializerConfigSnapshots =
+				((MapSerializerConfigSnapshot) configSnapshot).getNestedSerializerConfigSnapshots();
+
+			CompatibilityResult<K> keyCompatResult = keySerializer.ensureCompatibility(keyValueSerializerConfigSnapshots[0]);
+			CompatibilityResult<V> valueCompatResult = valueSerializer.ensureCompatibility(keyValueSerializerConfigSnapshots[1]);
+
+			if (!keyCompatResult.requiresMigration() && !valueCompatResult.requiresMigration()) {
+				return CompatibilityResult.compatible();
+			} else if (keyCompatResult.getConvertDeserializer() != null && valueCompatResult.getConvertDeserializer() != null) {
+				return CompatibilityResult.requiresMigration(
+					new HashMapSerializer<>(
+						keyCompatResult.getConvertDeserializer(),
+						valueCompatResult.getConvertDeserializer()));
+			}
+		}
+
+		return CompatibilityResult.requiresMigration(null);
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-runtime/src/main/java/org/apache/flink/runtime/state/JavaSerializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/JavaSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/JavaSerializer.java
index 512baf6..d49b1d2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/JavaSerializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/JavaSerializer.java
@@ -19,7 +19,7 @@
 package org.apache.flink.runtime.state;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
 import org.apache.flink.api.java.typeutils.runtime.DataInputViewStream;
 import org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream;
 import org.apache.flink.core.memory.DataInputView;
@@ -31,7 +31,7 @@ import java.io.Serializable;
 
 @SuppressWarnings("serial")
 @Internal
-final class JavaSerializer<T extends Serializable> extends TypeSerializer<T> {
+final class JavaSerializer<T extends Serializable> extends TypeSerializerSingleton<T> {
 
 	private static final long serialVersionUID = 5067491650263321234L;
 
@@ -41,11 +41,6 @@ final class JavaSerializer<T extends Serializable> extends TypeSerializer<T> {
 	}
 
 	@Override
-	public TypeSerializer<T> duplicate() {
-		return this;
-	}
-
-	@Override
 	public T createInstance() {
 		return null;
 	}
@@ -98,17 +93,7 @@ final class JavaSerializer<T extends Serializable> extends TypeSerializer<T> {
 	}
 
 	@Override
-	public boolean equals(Object obj) {
-		return obj instanceof JavaSerializer;
-	}
-
-	@Override
 	public boolean canEqual(Object obj) {
 		return obj instanceof JavaSerializer;
 	}
-
-	@Override
-	public int hashCode() {
-		return getClass().hashCode();
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendSerializationProxy.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendSerializationProxy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendSerializationProxy.java
index 5661c38..a389c4f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendSerializationProxy.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendSerializationProxy.java
@@ -18,10 +18,8 @@
 
 package org.apache.flink.runtime.state;
 
-import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerSerializationProxy;
-import org.apache.flink.core.io.IOReadableWritable;
 import org.apache.flink.core.io.VersionMismatchException;
 import org.apache.flink.core.io.VersionedIOReadableWritable;
 import org.apache.flink.core.memory.DataInputView;
@@ -33,15 +31,15 @@ import java.util.ArrayList;
 import java.util.List;
 
 /**
- * Serialization proxy for all meta data in keyed state backends. In the future we might also migrate the actual state
+ * Serialization proxy for all meta data in keyed state backends. In the future we might also requiresMigration the actual state
  * serialization logic here.
  */
 public class KeyedBackendSerializationProxy extends VersionedIOReadableWritable {
 
-	public static final int VERSION = 2;
+	public static final int VERSION = 3;
 
-	private TypeSerializerSerializationProxy<?> keySerializerProxy;
-	private List<StateMetaInfo<?, ?>> namedStateSerializationProxies;
+	private TypeSerializer<?> keySerializer;
+	private List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> stateMetaInfoSnapshots;
 
 	private int restoredVersion;
 	private ClassLoader userCodeClassLoader;
@@ -50,19 +48,25 @@ public class KeyedBackendSerializationProxy extends VersionedIOReadableWritable
 		this.userCodeClassLoader = Preconditions.checkNotNull(userCodeClassLoader);
 	}
 
-	public KeyedBackendSerializationProxy(TypeSerializer<?> keySerializer, List<StateMetaInfo<?, ?>> namedStateSerializationProxies) {
-		this.keySerializerProxy = new TypeSerializerSerializationProxy<>(Preconditions.checkNotNull(keySerializer));
-		this.namedStateSerializationProxies = Preconditions.checkNotNull(namedStateSerializationProxies);
+	public KeyedBackendSerializationProxy(
+			TypeSerializer<?> keySerializer,
+			List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> stateMetaInfoSnapshots) {
+
+		this.keySerializer = Preconditions.checkNotNull(keySerializer);
+
+		Preconditions.checkNotNull(stateMetaInfoSnapshots);
+		Preconditions.checkArgument(stateMetaInfoSnapshots.size() <= Short.MAX_VALUE);
+		this.stateMetaInfoSnapshots = stateMetaInfoSnapshots;
+
 		this.restoredVersion = VERSION;
-		Preconditions.checkArgument(namedStateSerializationProxies.size() <= Short.MAX_VALUE);
 	}
 
-	public List<StateMetaInfo<?, ?>> getNamedStateSerializationProxies() {
-		return namedStateSerializationProxies;
+	public List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> getStateMetaInfoSnapshots() {
+		return stateMetaInfoSnapshots;
 	}
 
-	public TypeSerializerSerializationProxy<?> getKeySerializerProxy() {
-		return keySerializerProxy;
+	public TypeSerializer<?> getKeySerializer() {
+		return keySerializer;
 	}
 
 	@Override
@@ -82,20 +86,22 @@ public class KeyedBackendSerializationProxy extends VersionedIOReadableWritable
 
 	@Override
 	public boolean isCompatibleVersion(int version) {
-		// we are compatible with version 2 (Flink 1.3.x) and version 1 (Flink 1.2.x)
-		return super.isCompatibleVersion(version) || version == 1;
+		// we are compatible with version 3 (Flink 1.3.x) and version 1 & 2 (Flink 1.2.x)
+		return super.isCompatibleVersion(version) || version == 2 || version == 1;
 	}
 
 	@Override
 	public void write(DataOutputView out) throws IOException {
 		super.write(out);
 
-		keySerializerProxy.write(out);
+		new TypeSerializerSerializationProxy<>(keySerializer).write(out);
 
-		out.writeShort(namedStateSerializationProxies.size());
+		out.writeShort(stateMetaInfoSnapshots.size());
 
-		for (StateMetaInfo<?, ?> kvState : namedStateSerializationProxies) {
-			kvState.write(out);
+		for (RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?> metaInfo : stateMetaInfoSnapshots) {
+			KeyedBackendStateMetaInfoSnapshotReaderWriters
+				.getWriterForVersion(VERSION, metaInfo)
+				.writeStateMetaInfo(out);
 		}
 	}
 
@@ -103,132 +109,18 @@ public class KeyedBackendSerializationProxy extends VersionedIOReadableWritable
 	public void read(DataInputView in) throws IOException {
 		super.read(in);
 
-		keySerializerProxy = new TypeSerializerSerializationProxy<>(userCodeClassLoader);
+		final TypeSerializerSerializationProxy<?> keySerializerProxy =
+			new TypeSerializerSerializationProxy<>(userCodeClassLoader);
 		keySerializerProxy.read(in);
+		this.keySerializer = keySerializerProxy.getTypeSerializer();
 
 		int numKvStates = in.readShort();
-		namedStateSerializationProxies = new ArrayList<>(numKvStates);
-		for (int i = 0; i < numKvStates; ++i) {
-			StateMetaInfo<?, ?> stateSerializationProxy = new StateMetaInfo<>(userCodeClassLoader);
-			stateSerializationProxy.read(in);
-			namedStateSerializationProxies.add(stateSerializationProxy);
-		}
-	}
-
-	//----------------------------------------------------------------------------------------------------------------------
-
-	/**
-	 * This is the serialization proxy for {@link RegisteredBackendStateMetaInfo} for a single registered state in a
-	 * keyed backend.
-	 */
-	public static class StateMetaInfo<N, S> implements IOReadableWritable {
-
-		private StateDescriptor.Type stateType;
-		private String stateName;
-		private TypeSerializerSerializationProxy<N> namespaceSerializerSerializationProxy;
-		private TypeSerializerSerializationProxy<S> stateSerializerSerializationProxy;
-
-		private ClassLoader userClassLoader;
-
-		StateMetaInfo(ClassLoader userClassLoader) {
-			this.userClassLoader = Preconditions.checkNotNull(userClassLoader);
-		}
-
-		public StateMetaInfo(
-				StateDescriptor.Type stateType,
-				String name,
-				TypeSerializer<N> namespaceSerializer,
-				TypeSerializer<S> stateSerializer) {
-
-			this.stateType = Preconditions.checkNotNull(stateType);
-			this.stateName = Preconditions.checkNotNull(name);
-			this.namespaceSerializerSerializationProxy = new TypeSerializerSerializationProxy<>(Preconditions.checkNotNull(namespaceSerializer));
-			this.stateSerializerSerializationProxy = new TypeSerializerSerializationProxy<>(Preconditions.checkNotNull(stateSerializer));
-		}
-
-		public StateDescriptor.Type getStateType() {
-			return stateType;
-		}
-
-		public void setStateType(StateDescriptor.Type stateType) {
-			this.stateType = stateType;
-		}
-
-		public String getStateName() {
-			return stateName;
-		}
-
-		public void setStateName(String stateName) {
-			this.stateName = stateName;
-		}
-
-		public TypeSerializerSerializationProxy<N> getNamespaceSerializerSerializationProxy() {
-			return namespaceSerializerSerializationProxy;
-		}
-
-		public void setNamespaceSerializerSerializationProxy(TypeSerializerSerializationProxy<N> namespaceSerializerSerializationProxy) {
-			this.namespaceSerializerSerializationProxy = namespaceSerializerSerializationProxy;
-		}
-
-		public TypeSerializerSerializationProxy<S> getStateSerializerSerializationProxy() {
-			return stateSerializerSerializationProxy;
-		}
-
-		public void setStateSerializerSerializationProxy(TypeSerializerSerializationProxy<S> stateSerializerSerializationProxy) {
-			this.stateSerializerSerializationProxy = stateSerializerSerializationProxy;
-		}
-
-		@Override
-		public void write(DataOutputView out) throws IOException {
-			out.writeInt(getStateType().ordinal());
-			out.writeUTF(getStateName());
-
-			getNamespaceSerializerSerializationProxy().write(out);
-			getStateSerializerSerializationProxy().write(out);
-		}
-
-		@Override
-		public void read(DataInputView in) throws IOException {
-			int enumOrdinal = in.readInt();
-			setStateType(StateDescriptor.Type.values()[enumOrdinal]);
-			setStateName(in.readUTF());
-
-			namespaceSerializerSerializationProxy = new TypeSerializerSerializationProxy<>(userClassLoader);
-			namespaceSerializerSerializationProxy.read(in);
-
-			stateSerializerSerializationProxy = new TypeSerializerSerializationProxy<>(userClassLoader);
-			stateSerializerSerializationProxy.read(in);
-		}
-
-		@Override
-		public boolean equals(Object o) {
-			if (this == o) {
-				return true;
-			}
-
-			if (o == null || getClass() != o.getClass()) {
-				return false;
-			}
-
-			StateMetaInfo<?, ?> that = (StateMetaInfo<?, ?>) o;
-
-			if (!getStateName().equals(that.getStateName())) {
-				return false;
-			}
-
-			if (!getNamespaceSerializerSerializationProxy().equals(that.getNamespaceSerializerSerializationProxy())) {
-				return false;
-			}
-
-			return getStateSerializerSerializationProxy().equals(that.getStateSerializerSerializationProxy());
-		}
-
-		@Override
-		public int hashCode() {
-			int result = getStateName().hashCode();
-			result = 31 * result + getNamespaceSerializerSerializationProxy().hashCode();
-			result = 31 * result + getStateSerializerSerializationProxy().hashCode();
-			return result;
+		stateMetaInfoSnapshots = new ArrayList<>(numKvStates);
+		for (int i = 0; i < numKvStates; i++) {
+			stateMetaInfoSnapshots.add(
+				KeyedBackendStateMetaInfoSnapshotReaderWriters
+					.getReaderForVersion(restoredVersion, userCodeClassLoader)
+					.readStateMetaInfo(in));
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendStateMetaInfoSnapshotReaderWriters.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendStateMetaInfoSnapshotReaderWriters.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendStateMetaInfoSnapshotReaderWriters.java
new file mode 100644
index 0000000..83aa335
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendStateMetaInfoSnapshotReaderWriters.java
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializerSerializationProxy;
+import org.apache.flink.api.common.typeutils.TypeSerializerUtil;
+import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
+import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+
+/**
+ * Readers and writers for different versions of the {@link RegisteredKeyedBackendStateMetaInfo.Snapshot}.
+ * Outdated formats are also kept here for documentation of history backlog.
+ */
+public class KeyedBackendStateMetaInfoSnapshotReaderWriters {
+
+	// -------------------------------------------------------------------------------
+	//  Writers
+	//   - v1: Flink 1.2.x
+	//   - v2: Flink 1.3.x
+	// -------------------------------------------------------------------------------
+
+	public static <N, S> KeyedBackendStateMetaInfoWriter getWriterForVersion(
+		int version, RegisteredKeyedBackendStateMetaInfo.Snapshot<N, S> stateMetaInfo) {
+
+		switch (version) {
+			case 1:
+			case 2:
+				return new KeyedBackendStateMetaInfoWriterV1V2<>(stateMetaInfo);
+
+			// current version
+			case KeyedBackendSerializationProxy.VERSION:
+				return new KeyedBackendStateMetaInfoWriterV3<>(stateMetaInfo);
+
+			default:
+				// guard for future
+				throw new IllegalStateException(
+							"Unrecognized keyed backend state meta info writer version: " + version);
+		}
+	}
+
+	public interface KeyedBackendStateMetaInfoWriter {
+		void writeStateMetaInfo(DataOutputView out) throws IOException;
+	}
+
+	static abstract class AbstractKeyedBackendStateMetaInfoWriter<N, S> implements KeyedBackendStateMetaInfoWriter {
+
+		protected final RegisteredKeyedBackendStateMetaInfo.Snapshot<N, S> stateMetaInfo;
+
+		public AbstractKeyedBackendStateMetaInfoWriter(RegisteredKeyedBackendStateMetaInfo.Snapshot<N, S> stateMetaInfo) {
+			this.stateMetaInfo = Preconditions.checkNotNull(stateMetaInfo);
+		}
+
+	}
+
+	static class KeyedBackendStateMetaInfoWriterV1V2<N, S> extends AbstractKeyedBackendStateMetaInfoWriter<N, S> {
+
+		public KeyedBackendStateMetaInfoWriterV1V2(RegisteredKeyedBackendStateMetaInfo.Snapshot<N, S> stateMetaInfo) {
+			super(stateMetaInfo);
+		}
+
+		@Override
+		public void writeStateMetaInfo(DataOutputView out) throws IOException {
+			out.writeInt(stateMetaInfo.getStateType().ordinal());
+			out.writeUTF(stateMetaInfo.getName());
+
+			new TypeSerializerSerializationProxy<>(stateMetaInfo.getNamespaceSerializer()).write(out);
+			new TypeSerializerSerializationProxy<>(stateMetaInfo.getStateSerializer()).write(out);
+		}
+	}
+
+	static class KeyedBackendStateMetaInfoWriterV3<N, S> extends AbstractKeyedBackendStateMetaInfoWriter<N, S> {
+
+		public KeyedBackendStateMetaInfoWriterV3(RegisteredKeyedBackendStateMetaInfo.Snapshot<N, S> stateMetaInfo) {
+			super(stateMetaInfo);
+		}
+
+		@Override
+		public void writeStateMetaInfo(DataOutputView out) throws IOException {
+			out.writeInt(stateMetaInfo.getStateType().ordinal());
+			out.writeUTF(stateMetaInfo.getName());
+
+			// write in a way that allows us to be fault-tolerant and skip blocks in the case of java serialization failures
+			try (
+				ByteArrayOutputStreamWithPos outWithPos = new ByteArrayOutputStreamWithPos();
+				DataOutputViewStreamWrapper outViewWrapper = new DataOutputViewStreamWrapper(outWithPos)) {
+
+				new TypeSerializerSerializationProxy<>(stateMetaInfo.getNamespaceSerializer()).write(outViewWrapper);
+
+				// write current offset, which represents the start offset of the state serializer
+				out.writeInt(outWithPos.getPosition());
+				new TypeSerializerSerializationProxy<>(stateMetaInfo.getStateSerializer()).write(outViewWrapper);
+
+				// write current offset, which represents the start of the configuration snapshots
+				out.writeInt(outWithPos.getPosition());
+				TypeSerializerUtil.writeSerializerConfigSnapshot(outViewWrapper, stateMetaInfo.getNamespaceSerializerConfigSnapshot());
+				TypeSerializerUtil.writeSerializerConfigSnapshot(outViewWrapper, stateMetaInfo.getStateSerializerConfigSnapshot());
+
+				// write total number of bytes and then flush
+				out.writeInt(outWithPos.getPosition());
+				out.write(outWithPos.getBuf(), 0, outWithPos.getPosition());
+			}
+		}
+	}
+
+
+	// -------------------------------------------------------------------------------
+	//  Readers
+	//   - v1: Flink 1.2.x
+	//   - v2: Flink 1.3.x
+	// -------------------------------------------------------------------------------
+
+	public static KeyedBackendStateMetaInfoReader getReaderForVersion(
+			int version, ClassLoader userCodeClassLoader) {
+
+		switch (version) {
+			case 1:
+			case 2:
+				return new KeyedBackendStateMetaInfoReaderV1V2<>(userCodeClassLoader);
+
+			// current version
+			case KeyedBackendSerializationProxy.VERSION:
+				return new KeyedBackendStateMetaInfoReaderV3<>(userCodeClassLoader);
+
+			default:
+				// guard for future
+				throw new IllegalStateException(
+							"Unrecognized keyed backend state meta info reader version: " + version);
+		}
+	}
+
+	public interface KeyedBackendStateMetaInfoReader<N, S> {
+		RegisteredKeyedBackendStateMetaInfo.Snapshot<N, S> readStateMetaInfo(DataInputView in) throws IOException;
+	}
+
+	static abstract class AbstractKeyedBackendStateMetaInfoReader implements KeyedBackendStateMetaInfoReader {
+
+		protected final ClassLoader userCodeClassLoader;
+
+		public AbstractKeyedBackendStateMetaInfoReader(ClassLoader userCodeClassLoader) {
+			this.userCodeClassLoader = Preconditions.checkNotNull(userCodeClassLoader);
+		}
+
+	}
+
+	static class KeyedBackendStateMetaInfoReaderV1V2<N, S> extends AbstractKeyedBackendStateMetaInfoReader {
+
+		public KeyedBackendStateMetaInfoReaderV1V2(ClassLoader userCodeClassLoader) {
+			super(userCodeClassLoader);
+		}
+
+		@Override
+		public RegisteredKeyedBackendStateMetaInfo.Snapshot<N, S> readStateMetaInfo(DataInputView in) throws IOException {
+			RegisteredKeyedBackendStateMetaInfo.Snapshot<N, S> metaInfo =
+				new RegisteredKeyedBackendStateMetaInfo.Snapshot<>();
+
+			metaInfo.setStateType(StateDescriptor.Type.values()[in.readInt()]);
+			metaInfo.setName(in.readUTF());
+
+			final TypeSerializerSerializationProxy<N> namespaceSerializerProxy =
+				new TypeSerializerSerializationProxy<>(userCodeClassLoader);
+			namespaceSerializerProxy.read(in);
+			metaInfo.setNamespaceSerializer(namespaceSerializerProxy.getTypeSerializer());
+
+			final TypeSerializerSerializationProxy<S> stateSerializerProxy =
+				new TypeSerializerSerializationProxy<>(userCodeClassLoader);
+			stateSerializerProxy.read(in);
+			metaInfo.setStateSerializer(stateSerializerProxy.getTypeSerializer());
+
+			// older versions do not contain the configuration snapshot
+			metaInfo.setNamespaceSerializerConfigSnapshot(null);
+			metaInfo.setStateSerializerConfigSnapshot(null);
+
+			return metaInfo;
+		}
+	}
+
+	static class KeyedBackendStateMetaInfoReaderV3<N, S> extends AbstractKeyedBackendStateMetaInfoReader {
+
+		public KeyedBackendStateMetaInfoReaderV3(ClassLoader userCodeClassLoader) {
+			super(userCodeClassLoader);
+		}
+
+		@Override
+		public RegisteredKeyedBackendStateMetaInfo.Snapshot<N, S> readStateMetaInfo(DataInputView in) throws IOException {
+			RegisteredKeyedBackendStateMetaInfo.Snapshot<N, S> metaInfo =
+				new RegisteredKeyedBackendStateMetaInfo.Snapshot<>();
+
+			metaInfo.setStateType(StateDescriptor.Type.values()[in.readInt()]);
+			metaInfo.setName(in.readUTF());
+
+			// read offsets
+			int stateSerializerStartOffset = in.readInt();
+			int configSnapshotsStartOffset = in.readInt();
+
+			int totalBytes = in.readInt();
+
+			byte[] buffer = new byte[totalBytes];
+			in.readFully(buffer);
+
+			ByteArrayInputStreamWithPos inWithPos = new ByteArrayInputStreamWithPos(buffer);
+			DataInputViewStreamWrapper inViewWrapper = new DataInputViewStreamWrapper(inWithPos);
+
+			try {
+				final TypeSerializerSerializationProxy<N> namespaceSerializerProxy =
+					new TypeSerializerSerializationProxy<>(userCodeClassLoader);
+				namespaceSerializerProxy.read(inViewWrapper);
+				metaInfo.setNamespaceSerializer(namespaceSerializerProxy.getTypeSerializer());
+			} catch (IOException e) {
+				metaInfo.setNamespaceSerializer(null);
+			}
+
+			// make sure we start from the state serializer bytes position
+			inWithPos.setPosition(stateSerializerStartOffset);
+			try {
+				final TypeSerializerSerializationProxy<S> stateSerializerProxy =
+					new TypeSerializerSerializationProxy<>(userCodeClassLoader);
+				stateSerializerProxy.read(inViewWrapper);
+				metaInfo.setStateSerializer(stateSerializerProxy.getTypeSerializer());
+			} catch (IOException e) {
+				metaInfo.setStateSerializer(null);
+			}
+
+			// make sure we start from the config snapshot bytes position
+			inWithPos.setPosition(configSnapshotsStartOffset);
+			metaInfo.setNamespaceSerializerConfigSnapshot(
+				TypeSerializerUtil.readSerializerConfigSnapshot(inViewWrapper, userCodeClassLoader));
+			metaInfo.setStateSerializerConfigSnapshot(
+				TypeSerializerUtil.readSerializerConfigSnapshot(inViewWrapper, userCodeClassLoader));
+
+			return metaInfo;
+		}
+	}
+}