You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2017/05/13 09:03:41 UTC

[1/3] flink git commit: [FLINK-6565] Fail memory-backed state restores with meaningful message if previous serializer is unavailable

Repository: flink
Updated Branches:
  refs/heads/master 7173774d0 -> 947c44e86


[FLINK-6565] Fail memory-backed state restores with meaningful message if previous serializer is unavailable

This closes #3882.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c594af09
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c594af09
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c594af09

Branch: refs/heads/master
Commit: c594af09767e2ef1e74dd8db187985460761b724
Parents: 7173774
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
Authored: Fri May 12 19:11:25 2017 +0800
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Sat May 13 14:37:49 2017 +0800

----------------------------------------------------------------------
 .../state/DefaultOperatorStateBackend.java      |  17 +++
 ...ckendStateMetaInfoSnapshotReaderWriters.java |   8 ++
 ...ckendStateMetaInfoSnapshotReaderWriters.java |   6 +
 .../state/heap/HeapKeyedStateBackend.java       |  17 +++
 .../runtime/state/MemoryStateBackendTest.java   | 135 +++++++++++++++++++
 .../runtime/state/OperatorStateBackendTest.java |  70 +++++++++-
 .../runtime/state/StateBackendTestBase.java     |   2 +-
 7 files changed, 251 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c594af09/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
index ab0c1f0..1d3af72 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerSerializationProxy;
 import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.core.fs.FSDataInputStream;
 import org.apache.flink.core.fs.FSDataOutputStream;
@@ -293,6 +294,22 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend {
 
 				// Recreate all PartitionableListStates from the meta info
 				for (RegisteredOperatorBackendStateMetaInfo.Snapshot<?> restoredMetaInfo : restoredMetaInfoSnapshots) {
+
+					if (restoredMetaInfo.getPartitionStateSerializer() == null ||
+							restoredMetaInfo.getPartitionStateSerializer()
+								instanceof TypeSerializerSerializationProxy.ClassNotFoundDummyTypeSerializer) {
+
+						// must fail now if the previous serializer cannot be restored because there is no serializer
+						// capable of reading previous state
+						// TODO when eager state registration is in place, we can try to get a convert deserializer
+						// TODO from the newly registered serializer instead of simply failing here
+
+						throw new IOException("Unable to restore operator state [" + restoredMetaInfo.getName() + "]." +
+							" The previous serializer of the operator state must be present; the serializer could" +
+							" have been removed from the classpath, or its implementation have changed and could" +
+							" not be loaded. This is a temporary restriction that will be fixed in future versions.");
+					}
+
 					PartitionableListState<?> listState = registeredStates.get(restoredMetaInfo.getName());
 
 					if (null == listState) {

http://git-wip-us.apache.org/repos/asf/flink/blob/c594af09/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendStateMetaInfoSnapshotReaderWriters.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendStateMetaInfoSnapshotReaderWriters.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendStateMetaInfoSnapshotReaderWriters.java
index 83aa335..ac81e86 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendStateMetaInfoSnapshotReaderWriters.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendStateMetaInfoSnapshotReaderWriters.java
@@ -28,6 +28,8 @@ import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 
@@ -37,6 +39,8 @@ import java.io.IOException;
  */
 public class KeyedBackendStateMetaInfoSnapshotReaderWriters {
 
+	private static final Logger LOG = LoggerFactory.getLogger(KeyedBackendStateMetaInfoSnapshotReaderWriters.class);
+
 	// -------------------------------------------------------------------------------
 	//  Writers
 	//   - v1: Flink 1.2.x
@@ -230,6 +234,8 @@ public class KeyedBackendStateMetaInfoSnapshotReaderWriters {
 				namespaceSerializerProxy.read(inViewWrapper);
 				metaInfo.setNamespaceSerializer(namespaceSerializerProxy.getTypeSerializer());
 			} catch (IOException e) {
+				LOG.warn("Deserialization of previous namespace serializer errored; setting serializer to null. ", e);
+
 				metaInfo.setNamespaceSerializer(null);
 			}
 
@@ -241,6 +247,8 @@ public class KeyedBackendStateMetaInfoSnapshotReaderWriters {
 				stateSerializerProxy.read(inViewWrapper);
 				metaInfo.setStateSerializer(stateSerializerProxy.getTypeSerializer());
 			} catch (IOException e) {
+				LOG.warn("Deserialization of previous state serializer errored; setting serializer to null. ", e);
+
 				metaInfo.setStateSerializer(null);
 			}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/c594af09/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendStateMetaInfoSnapshotReaderWriters.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendStateMetaInfoSnapshotReaderWriters.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendStateMetaInfoSnapshotReaderWriters.java
index 9ab106b..4f151c9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendStateMetaInfoSnapshotReaderWriters.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendStateMetaInfoSnapshotReaderWriters.java
@@ -30,6 +30,8 @@ import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 
@@ -39,6 +41,8 @@ import java.io.IOException;
  */
 public class OperatorBackendStateMetaInfoSnapshotReaderWriters {
 
+	private static final Logger LOG = LoggerFactory.getLogger(OperatorBackendStateMetaInfoSnapshotReaderWriters.class);
+
 	// -------------------------------------------------------------------------------
 	//  Writers
 	//   - v1: Flink 1.2.x
@@ -219,6 +223,8 @@ public class OperatorBackendStateMetaInfoSnapshotReaderWriters {
 				partitionStateSerializerProxy.read(inViewWrapper);
 				stateMetaInfo.setPartitionStateSerializer(partitionStateSerializerProxy.getTypeSerializer());
 			} catch (IOException e) {
+				LOG.warn("Deserialization of previous serializer errored; setting serializer to null. ", e);
+
 				stateMetaInfo.setPartitionStateSerializer(null);
 			}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/c594af09/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
index 866ed28..bc314df 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
@@ -30,6 +30,7 @@ import org.apache.flink.api.common.state.ReducingStateDescriptor;
 import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerSerializationProxy;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.core.fs.FSDataInputStream;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
@@ -389,6 +390,22 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 
 				for (RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?> restoredMetaInfo : restoredMetaInfos) {
 
+					if (restoredMetaInfo.getStateSerializer() == null ||
+							restoredMetaInfo.getStateSerializer()
+								instanceof TypeSerializerSerializationProxy.ClassNotFoundDummyTypeSerializer) {
+
+						// must fail now if the previous serializer cannot be restored because there is no serializer
+						// capable of reading previous state
+						// TODO when eager state registration is in place, we can try to get a convert deserializer
+						// TODO from the newly registered serializer instead of simply failing here
+
+						throw new IOException("Unable to restore keyed state [" + restoredMetaInfo.getName() + "]." +
+							" For memory-backed keyed state, the previous serializer of the keyed state must be" +
+							" present; the serializer could have been removed from the classpath, or its implementation" +
+							" have changed and could not be loaded. This is a temporary restriction that will be fixed" +
+							" in future versions.");
+					}
+
 					StateTable<K, ?, ?> stateTable = stateTables.get(restoredMetaInfo.getName());
 
 					//important: only create a new table we did not already create it previously

http://git-wip-us.apache.org/repos/asf/flink/blob/c594af09/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java
index 48d56e2..fee97f4 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java
@@ -20,27 +20,48 @@ package org.apache.flink.runtime.state;
 
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.state.ValueState;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializerSerializationProxy;
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend;
 import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.util.FutureUtil;
+import org.junit.Assert;
 import org.junit.Ignore;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
 
 import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import java.util.Collections;
 import java.util.HashMap;
+import java.util.concurrent.RunnableFuture;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 /**
  * Tests for the {@link org.apache.flink.runtime.state.memory.MemoryStateBackend}.
  */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({KeyedBackendStateMetaInfoSnapshotReaderWriters.class, OperatorBackendStateMetaInfoSnapshotReaderWriters.class})
 public class MemoryStateBackendTest extends StateBackendTestBase<MemoryStateBackend> {
 
 	@Override
@@ -198,6 +219,120 @@ public class MemoryStateBackendTest extends StateBackendTestBase<MemoryStateBack
 		}
 	}
 
+	/**
+	 * Verifies that the operator state backend fails with appropriate error and message if
+	 * previous serializer can not be restored.
+	 */
+	@Test
+	public void testOperatorStateRestoreFailsIfSerializerDeserializationFails() throws Exception {
+		AbstractStateBackend abstractStateBackend = new MemoryStateBackend(4096);
+
+		Environment env = mock(Environment.class);
+		when(env.getExecutionConfig()).thenReturn(new ExecutionConfig());
+		when(env.getUserClassLoader()).thenReturn(OperatorStateBackendTest.class.getClassLoader());
+
+		OperatorStateBackend operatorStateBackend = abstractStateBackend.createOperatorStateBackend(env, "test-op-name");
+
+		// write some state
+		ListStateDescriptor<Serializable> stateDescriptor1 = new ListStateDescriptor<>("test1", new JavaSerializer<>());
+		ListStateDescriptor<Serializable> stateDescriptor2 = new ListStateDescriptor<>("test2", new JavaSerializer<>());
+		ListStateDescriptor<Serializable> stateDescriptor3 = new ListStateDescriptor<>("test3", new JavaSerializer<>());
+		ListState<Serializable> listState1 = operatorStateBackend.getListState(stateDescriptor1);
+		ListState<Serializable> listState2 = operatorStateBackend.getListState(stateDescriptor2);
+		ListState<Serializable> listState3 = operatorStateBackend.getUnionListState(stateDescriptor3);
+
+		listState1.add(42);
+		listState1.add(4711);
+
+		listState2.add(7);
+		listState2.add(13);
+		listState2.add(23);
+
+		listState3.add(17);
+		listState3.add(18);
+		listState3.add(19);
+		listState3.add(20);
+
+		CheckpointStreamFactory streamFactory = abstractStateBackend.createStreamFactory(new JobID(), "testOperator");
+		RunnableFuture<OperatorStateHandle> runnableFuture =
+			operatorStateBackend.snapshot(1, 1, streamFactory, CheckpointOptions.forFullCheckpoint());
+		OperatorStateHandle stateHandle = FutureUtil.runIfNotDoneAndGet(runnableFuture);
+
+		try {
+
+			operatorStateBackend.close();
+			operatorStateBackend.dispose();
+
+			operatorStateBackend = abstractStateBackend.createOperatorStateBackend(
+				env,
+				"testOperator");
+
+			// mock failure when deserializing serializer
+			TypeSerializerSerializationProxy<?> mockProxy = mock(TypeSerializerSerializationProxy.class);
+			doThrow(new IOException()).when(mockProxy).read(any(DataInputViewStreamWrapper.class));
+			PowerMockito.whenNew(TypeSerializerSerializationProxy.class).withAnyArguments().thenReturn(mockProxy);
+
+			operatorStateBackend.restore(Collections.singletonList(stateHandle));
+
+			fail("The operator state restore should have failed if the previous state serializer could not be loaded.");
+		} catch (IOException expected) {
+			Assert.assertTrue(expected.getMessage().contains("Unable to restore operator state"));
+		} finally {
+			stateHandle.discardState();
+		}
+	}
+
+	/**
+	 * Verifies that memory-backed keyed state backend fails with appropriate error and message if
+	 * previous serializer can not be restored.
+	 */
+	@Test
+	public void testKeyedStateRestoreFailsIfSerializerDeserializationFails() throws Exception {
+		CheckpointStreamFactory streamFactory = createStreamFactory();
+		KeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE);
+
+		ValueStateDescriptor<String> kvId = new ValueStateDescriptor<>("id", String.class, null);
+		kvId.initializeSerializerUnlessSet(new ExecutionConfig());
+
+		HeapKeyedStateBackend<Integer> heapBackend = (HeapKeyedStateBackend<Integer>) backend;
+
+		assertEquals(0, heapBackend.numStateEntries());
+
+		ValueState<String> state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
+
+		// write some state
+		backend.setCurrentKey(0);
+		state.update("hello");
+		state.update("ciao");
+
+		KeyedStateHandle snapshot = runSnapshot(((HeapKeyedStateBackend<Integer>) backend).snapshot(
+			682375462378L,
+			2,
+			streamFactory,
+			CheckpointOptions.forFullCheckpoint()));
+
+		backend.dispose();
+
+		// ========== restore snapshot ==========
+
+		Environment env = mock(Environment.class);
+		when(env.getExecutionConfig()).thenReturn(new ExecutionConfig());
+		when(env.getUserClassLoader()).thenReturn(OperatorStateBackendTest.class.getClassLoader());
+
+		// mock failure when deserializing serializer
+		TypeSerializerSerializationProxy<?> mockProxy = mock(TypeSerializerSerializationProxy.class);
+		doThrow(new IOException()).when(mockProxy).read(any(DataInputViewStreamWrapper.class));
+		PowerMockito.whenNew(TypeSerializerSerializationProxy.class).withAnyArguments().thenReturn(mockProxy);
+
+		try {
+			restoreKeyedBackend(IntSerializer.INSTANCE, snapshot, env);
+
+			fail("The keyed state restore should have failed if the previous state serializer could not be loaded.");
+		} catch (IOException expected) {
+			Assert.assertTrue(expected.getMessage().contains("Unable to restore keyed state"));
+		}
+	}
+
 	@Ignore
 	@Test
 	public void testConcurrentMapIfQueryable() throws Exception {

http://git-wip-us.apache.org/repos/asf/flink/blob/c594af09/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
index 85b9eaf..af5f0b2 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
@@ -22,7 +22,9 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerSerializationProxy;
 import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.execution.Environment;
@@ -32,6 +34,10 @@ import org.apache.flink.runtime.util.BlockerCheckpointStreamFactory;
 import org.apache.flink.util.FutureUtil;
 import org.junit.Assert;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
 
 import java.io.File;
 import java.io.IOException;
@@ -52,9 +58,13 @@ import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(OperatorBackendStateMetaInfoSnapshotReaderWriters.class)
 public class OperatorStateBackendTest {
 
 	private final ClassLoader classLoader = getClass().getClassLoader();
@@ -290,7 +300,7 @@ public class OperatorStateBackendTest {
 
 	@Test
 	public void testSnapshotRestoreAsync() throws Exception {
-		DefaultOperatorStateBackend operatorStateBackend =
+		OperatorStateBackend operatorStateBackend =
 				new DefaultOperatorStateBackend(OperatorStateBackendTest.class.getClassLoader(), new ExecutionConfig(), true);
 
 		ListStateDescriptor<MutableType> stateDescriptor1 =
@@ -362,8 +372,7 @@ public class OperatorStateBackendTest {
 
 			AbstractStateBackend abstractStateBackend = new MemoryStateBackend(4096);
 
-			//TODO this is temporarily casted to test already functionality that we do not yet expose through public API
-			operatorStateBackend = (DefaultOperatorStateBackend) abstractStateBackend.createOperatorStateBackend(
+			operatorStateBackend = abstractStateBackend.createOperatorStateBackend(
 					createMockEnvironment(),
 					"testOperator");
 
@@ -494,6 +503,61 @@ public class OperatorStateBackendTest {
 		}
 	}
 
+	@Test
+	public void testRestoreFailsIfSerializerDeserializationFails() throws Exception {
+		AbstractStateBackend abstractStateBackend = new MemoryStateBackend(4096);
+
+		OperatorStateBackend operatorStateBackend = abstractStateBackend.createOperatorStateBackend(createMockEnvironment(), "test-op-name");
+
+		// write some state
+		ListStateDescriptor<Serializable> stateDescriptor1 = new ListStateDescriptor<>("test1", new JavaSerializer<>());
+		ListStateDescriptor<Serializable> stateDescriptor2 = new ListStateDescriptor<>("test2", new JavaSerializer<>());
+		ListStateDescriptor<Serializable> stateDescriptor3 = new ListStateDescriptor<>("test3", new JavaSerializer<>());
+		ListState<Serializable> listState1 = operatorStateBackend.getListState(stateDescriptor1);
+		ListState<Serializable> listState2 = operatorStateBackend.getListState(stateDescriptor2);
+		ListState<Serializable> listState3 = operatorStateBackend.getUnionListState(stateDescriptor3);
+
+		listState1.add(42);
+		listState1.add(4711);
+
+		listState2.add(7);
+		listState2.add(13);
+		listState2.add(23);
+
+		listState3.add(17);
+		listState3.add(18);
+		listState3.add(19);
+		listState3.add(20);
+
+		CheckpointStreamFactory streamFactory = abstractStateBackend.createStreamFactory(new JobID(), "testOperator");
+		RunnableFuture<OperatorStateHandle> runnableFuture =
+			operatorStateBackend.snapshot(1, 1, streamFactory, CheckpointOptions.forFullCheckpoint());
+		OperatorStateHandle stateHandle = FutureUtil.runIfNotDoneAndGet(runnableFuture);
+
+		try {
+
+			operatorStateBackend.close();
+			operatorStateBackend.dispose();
+
+			operatorStateBackend = abstractStateBackend.createOperatorStateBackend(
+				createMockEnvironment(),
+				"testOperator");
+
+			// mock failure when deserializing serializer
+			TypeSerializerSerializationProxy<?> mockProxy = mock(TypeSerializerSerializationProxy.class);
+			doThrow(new IOException()).when(mockProxy).read(any(DataInputViewStreamWrapper.class));
+			PowerMockito.whenNew(TypeSerializerSerializationProxy.class).withAnyArguments().thenReturn(mockProxy);
+
+			operatorStateBackend.restore(Collections.singletonList(stateHandle));
+
+			fail("The operator state restore should have failed if the previous state serializer could not be loaded.");
+		} catch (IOException expected) {
+			Assert.assertTrue(expected.getMessage().contains("Unable to restore operator state"));
+		} finally {
+			stateHandle.discardState();
+		}
+	}
+
 	static final class MutableType implements Serializable {
 
 		private static final long serialVersionUID = 1L;

http://git-wip-us.apache.org/repos/asf/flink/blob/c594af09/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
index 96025fe..658ccde 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
@@ -2508,7 +2508,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 		}
 	}
 
-	private KeyedStateHandle runSnapshot(RunnableFuture<KeyedStateHandle> snapshotRunnableFuture) throws Exception {
+	protected KeyedStateHandle runSnapshot(RunnableFuture<KeyedStateHandle> snapshotRunnableFuture) throws Exception {
 		if(!snapshotRunnableFuture.isDone()) {
 			Thread runner = new Thread(snapshotRunnableFuture);
 			runner.start();


[3/3] flink git commit: [FLINK-6554] [core] Make CompatibilityResult options more explicitly defined

Posted by tz...@apache.org.
[FLINK-6554] [core] Make CompatibilityResult options more explicitly defined

Previously, if a serializer determines that state migration needs to be
performed but could not provide a fallback convert deserializer, it
would use CompatibilityResult.requiresMigration(null).

This commit makes this option more explicit by having a
CompatibilityResult.requiresMigration() option that takes no parameters.
This improves how the user perceives the API without having to rely on
the Javadoc that it is allowed to have no fallback convert deserializer.

Consequently, when using
CompatibilityResult.requiresMigration(TypeDeserializer), the provided
argument cannot be null.

This closes #3886.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/947c44e8
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/947c44e8
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/947c44e8

Branch: refs/heads/master
Commit: 947c44e862396baa95e74cbdc50a4c7cd3befe9b
Parents: 347100d
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
Authored: Fri May 12 21:00:51 2017 +0800
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Sat May 13 15:00:33 2017 +0800

----------------------------------------------------------------------
 .../typeutils/runtime/WritableSerializer.java   |  2 +-
 .../state/RocksDBKeyedStateBackend.java         |  2 +-
 .../common/typeutils/CompatibilityResult.java   | 36 ++++++++++++++------
 .../api/common/typeutils/TypeSerializer.java    |  9 +++--
 .../common/typeutils/base/EnumSerializer.java   |  2 +-
 .../typeutils/base/GenericArraySerializer.java  |  6 ++--
 .../common/typeutils/base/ListSerializer.java   |  4 +--
 .../common/typeutils/base/MapSerializer.java    |  4 +--
 .../typeutils/base/TypeSerializerSingleton.java |  2 +-
 .../java/typeutils/runtime/AvroSerializer.java  |  4 +--
 .../runtime/CopyableValueSerializer.java        |  2 +-
 .../typeutils/runtime/EitherSerializer.java     |  4 +--
 .../java/typeutils/runtime/PojoSerializer.java  | 16 ++++-----
 .../java/typeutils/runtime/RowSerializer.java   |  6 ++--
 .../typeutils/runtime/TupleSerializerBase.java  |  6 ++--
 .../java/typeutils/runtime/ValueSerializer.java |  2 +-
 .../typeutils/runtime/kryo/KryoSerializer.java  |  4 +--
 .../common/typeutils/SerializerTestBase.java    |  4 +--
 .../typeutils/base/EnumSerializerTest.java      |  6 ++--
 .../typeutils/runtime/PojoSerializerTest.java   | 10 +++---
 .../kryo/KryoSerializerCompatibilityTest.java   |  4 +--
 .../AbstractKeyedCEPPatternOperator.java        |  4 +--
 .../table/runtime/types/CRowSerializer.scala    |  6 ++--
 .../runtime/state/ArrayListSerializer.java      |  4 +--
 .../flink/runtime/state/HashMapSerializer.java  |  4 +--
 .../flink/runtime/state/StateMigrationUtil.java |  4 +--
 .../api/scala/typeutils/EitherSerializer.scala  |  8 +++--
 .../scala/typeutils/EnumValueSerializer.scala   |  6 ++--
 .../api/scala/typeutils/OptionSerializer.scala  |  6 ++--
 .../api/scala/typeutils/TrySerializer.scala     |  6 ++--
 .../MultiplexingStreamRecordSerializer.java     |  4 +--
 .../streamrecord/StreamRecordSerializer.java    |  6 ++--
 .../streamrecord/StreamElementSerializer.java   |  4 +--
 33 files changed, 109 insertions(+), 88 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/947c44e8/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializer.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializer.java b/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializer.java
index 1a02e7b..421d7a3 100644
--- a/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializer.java
+++ b/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializer.java
@@ -171,7 +171,7 @@ public final class WritableSerializer<T extends Writable> extends TypeSerializer
 
 			return CompatibilityResult.compatible();
 		} else {
-			return CompatibilityResult.requiresMigration(null);
+			return CompatibilityResult.requiresMigration();
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/947c44e8/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index f5dddd6..6af53c3 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -1520,7 +1520,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 					restoredMetaInfo.getStateSerializerConfigSnapshot(),
 					newMetaInfo.getStateSerializer());
 
-			if (!namespaceCompatibility.requiresMigration() && !stateCompatibility.requiresMigration()) {
+			if (!namespaceCompatibility.isRequiresMigration() && !stateCompatibility.isRequiresMigration()) {
 				stateInfo.f1 = newMetaInfo;
 				return stateInfo.f0;
 			} else {

http://git-wip-us.apache.org/repos/asf/flink/blob/947c44e8/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompatibilityResult.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompatibilityResult.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompatibilityResult.java
index 891cfe0..5ad0b5e 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompatibilityResult.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompatibilityResult.java
@@ -19,6 +19,7 @@
 package org.apache.flink.api.common.typeutils;
 
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.util.Preconditions;
 
 /**
  * A {@code CompatibilityResult} contains information about whether or not data migration
@@ -41,7 +42,7 @@ public final class CompatibilityResult<T> {
 	private final TypeDeserializer<T> convertDeserializer;
 
 	/**
-	 * Returns a strategy that signals that the new serializer is compatible and no migration is required.
+	 * Returns a result that signals that the new serializer is compatible and no migration is required.
 	 *
 	 * @return a result that signals migration is not required for the new serializer
 	 */
@@ -50,19 +51,32 @@ public final class CompatibilityResult<T> {
 	}
 
 	/**
-	 * Returns a strategy that signals migration to be performed.
+	 * Returns a result that signals migration to be performed, and in the case that the preceding serializer
+	 * cannot be found or restored to read the previous data during migration, a provided convert deserializer
+	 * can be used.
 	 *
-	 * <p>Furthermore, in the case that the preceding serializer cannot be found or restored to read the
-	 * previous data during migration, a provided convert deserializer can be used (may be {@code null}
-	 * if one cannot be provided).
+	 * @param convertDeserializer the convert deserializer to use, in the case that the preceding serializer
+	 *                            cannot be found.
 	 *
-	 * <p>In the case that the preceding serializer cannot be found and a convert deserializer is not
-	 * provided, the migration will fail due to the incapability of reading previous data.
-	 *
-	 * @return a result that signals migration is necessary, possibly providing a convert deserializer.
+	 * @return a result that signals migration is necessary, also providing a convert deserializer.
 	 */
 	public static <T> CompatibilityResult<T> requiresMigration(TypeDeserializer<T> convertDeserializer) {
-		return new CompatibilityResult<>(true, convertDeserializer);
+		Preconditions.checkNotNull(convertDeserializer, "Convert deserializer cannot be null.");
+
+		return new CompatibilityResult<>(true, Preconditions.checkNotNull(convertDeserializer));
+	}
+
+	/**
+	 * Returns a result that signals migration to be performed. The migration will fail if the preceding
+	 * serializer for the previous data cannot be found.
+	 *
+	 * <p>You can also provide a convert deserializer using {@link #requiresMigration(TypeDeserializer)},
+	 * which will be used as a fallback resort in such cases.
+	 *
+	 * @return a result that signals migration is necessary, without providing a convert deserializer.
+	 */
+	public static <T> CompatibilityResult<T> requiresMigration() {
+		return new CompatibilityResult<>(true, null);
 	}
 
 	private CompatibilityResult(boolean requiresMigration, TypeDeserializer<T> convertDeserializer) {
@@ -74,7 +88,7 @@ public final class CompatibilityResult<T> {
 		return convertDeserializer;
 	}
 
-	public boolean requiresMigration() {
+	public boolean isRequiresMigration() {
 		return requiresMigration;
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/947c44e8/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java
index 0b5a08a..85cbfdb 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java
@@ -201,14 +201,19 @@ public abstract class TypeSerializer<T> implements TypeDeserializer<T>, Serializ
 	 *     migration needs to be performed, because this serializer is not compatible, or cannot be reconfigured to be
 	 *     compatible, for previous data. Furthermore, in the case that the preceding serializer cannot be found or
 	 *     restored to read the previous data to perform the migration, the provided convert deserializer can be
-	 *     used (may be {@code null} if one cannot be provided).</li>
+	 *     used as a fallback resort.</li>
+	 *
+	 *     <li>{@link CompatibilityResult#requiresMigration()}: this signals Flink that migration needs to be
+	 *     performed, because this serializer is not compatible, or cannot be reconfigured to be compatible, for
+	 *     previous data. If the preceding serializer cannot be found (either its implementation changed or it was
+	 *     removed from the classpath) then the migration will fail due to incapability to read previous data.</li>
 	 * </ul>
 	 *
 	 * @see CompatibilityResult
 	 *
 	 * @param configSnapshot configuration snapshot of a preceding serializer for the same managed state
 	 *
-	 * @return the determined compatibility result.
+	 * @return the determined compatibility result (cannot be {@code null}).
 	 */
 	public abstract CompatibilityResult<T> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/947c44e8/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/EnumSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/EnumSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/EnumSerializer.java
index 2f74d84..d9246ae 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/EnumSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/EnumSerializer.java
@@ -205,7 +205,7 @@ public final class EnumSerializer<T extends Enum<T>> extends TypeSerializer<T> {
 			}
 		}
 
-		return CompatibilityResult.requiresMigration(null);
+		return CompatibilityResult.requiresMigration();
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/947c44e8/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializer.java
index 3e592b4..54c604c 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializer.java
@@ -208,8 +208,8 @@ public final class GenericArraySerializer<C> extends TypeSerializer<C[]> {
 				CompatibilityResult<C> compatResult = componentSerializer.ensureCompatibility(
 					config.getSingleNestedSerializerConfigSnapshot());
 
-				if (!compatResult.requiresMigration()) {
-					return CompatibilityResult.requiresMigration(null);
+				if (!compatResult.isRequiresMigration()) {
+					return CompatibilityResult.compatible();
 				} else if (compatResult.getConvertDeserializer() != null) {
 					return CompatibilityResult.requiresMigration(
 						new GenericArraySerializer<>(
@@ -219,6 +219,6 @@ public final class GenericArraySerializer<C> extends TypeSerializer<C[]> {
 			}
 		}
 
-		return CompatibilityResult.requiresMigration(null);
+		return CompatibilityResult.requiresMigration();
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/947c44e8/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ListSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ListSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ListSerializer.java
index 1b6540c..aa9808e 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ListSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ListSerializer.java
@@ -185,7 +185,7 @@ public final class ListSerializer<T> extends TypeSerializer<List<T>> {
 			CompatibilityResult<T> compatResult = elementSerializer.ensureCompatibility(
 				((CollectionSerializerConfigSnapshot) configSnapshot).getSingleNestedSerializerConfigSnapshot());
 
-			if (!compatResult.requiresMigration()) {
+			if (!compatResult.isRequiresMigration()) {
 				return CompatibilityResult.compatible();
 			} else if (compatResult.getConvertDeserializer() != null) {
 				return CompatibilityResult.requiresMigration(
@@ -193,6 +193,6 @@ public final class ListSerializer<T> extends TypeSerializer<List<T>> {
 			}
 		}
 
-		return CompatibilityResult.requiresMigration(null);
+		return CompatibilityResult.requiresMigration();
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/947c44e8/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MapSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MapSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MapSerializer.java
index 182fff6..d5d6ec8 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MapSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MapSerializer.java
@@ -216,7 +216,7 @@ public final class MapSerializer<K, V> extends TypeSerializer<Map<K, V>> {
 			CompatibilityResult<K> keyCompatResult = keySerializer.ensureCompatibility(keyValueSerializerConfigSnapshots[0]);
 			CompatibilityResult<V> valueCompatResult = valueSerializer.ensureCompatibility(keyValueSerializerConfigSnapshots[1]);
 
-			if (!keyCompatResult.requiresMigration() && !valueCompatResult.requiresMigration()) {
+			if (!keyCompatResult.isRequiresMigration() && !valueCompatResult.isRequiresMigration()) {
 				return CompatibilityResult.compatible();
 			} else if (keyCompatResult.getConvertDeserializer() != null && valueCompatResult.getConvertDeserializer() != null) {
 				return CompatibilityResult.requiresMigration(
@@ -226,6 +226,6 @@ public final class MapSerializer<K, V> extends TypeSerializer<Map<K, V>> {
 			}
 		}
 
-		return CompatibilityResult.requiresMigration(null);
+		return CompatibilityResult.requiresMigration();
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/947c44e8/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/TypeSerializerSingleton.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/TypeSerializerSingleton.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/TypeSerializerSingleton.java
index c5decc5..9354af0 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/TypeSerializerSingleton.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/TypeSerializerSingleton.java
@@ -66,7 +66,7 @@ public abstract class TypeSerializerSingleton<T> extends TypeSerializer<T>{
 
 			return CompatibilityResult.compatible();
 		} else {
-			return CompatibilityResult.requiresMigration(null);
+			return CompatibilityResult.requiresMigration();
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/947c44e8/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/AvroSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/AvroSerializer.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/AvroSerializer.java
index c9eeb34..565bd4d 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/AvroSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/AvroSerializer.java
@@ -238,7 +238,7 @@ public final class AvroSerializer<T> extends TypeSerializer<T> {
 
 				for (Map.Entry<String, KryoRegistration> reconfiguredRegistrationEntry : kryoRegistrations.entrySet()) {
 					if (reconfiguredRegistrationEntry.getValue().isDummy()) {
-						return CompatibilityResult.requiresMigration(null);
+						return CompatibilityResult.requiresMigration();
 					}
 				}
 
@@ -249,7 +249,7 @@ public final class AvroSerializer<T> extends TypeSerializer<T> {
 
 		// ends up here if the preceding serializer is not
 		// the ValueSerializer, or serialized data type has changed
-		return CompatibilityResult.requiresMigration(null);
+		return CompatibilityResult.requiresMigration();
 	}
 
 	public static class AvroSerializerConfigSnapshot<T> extends KryoRegistrationSerializerConfigSnapshot<T> {

http://git-wip-us.apache.org/repos/asf/flink/blob/947c44e8/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/CopyableValueSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/CopyableValueSerializer.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/CopyableValueSerializer.java
index 46b93c2..b903969 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/CopyableValueSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/CopyableValueSerializer.java
@@ -147,7 +147,7 @@ public final class CopyableValueSerializer<T extends CopyableValue<T>> extends T
 				&& valueClass.equals(((CopyableValueSerializerConfigSnapshot) configSnapshot).getTypeClass())) {
 			return CompatibilityResult.compatible();
 		} else {
-			return CompatibilityResult.requiresMigration(null);
+			return CompatibilityResult.requiresMigration();
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/947c44e8/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializer.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializer.java
index 461dd87..4373ee0 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializer.java
@@ -207,7 +207,7 @@ public class EitherSerializer<L, R> extends TypeSerializer<Either<L, R>> {
 			CompatibilityResult<L> leftCompatResult = leftSerializer.ensureCompatibility(leftRightSerializerConfigSnapshots[0]);
 			CompatibilityResult<R> rightCompatResult = rightSerializer.ensureCompatibility(leftRightSerializerConfigSnapshots[1]);
 
-			if (!leftCompatResult.requiresMigration() && !rightCompatResult.requiresMigration()) {
+			if (!leftCompatResult.isRequiresMigration() && !rightCompatResult.isRequiresMigration()) {
 				return CompatibilityResult.compatible();
 			} else {
 				if (leftCompatResult.getConvertDeserializer() != null && rightCompatResult.getConvertDeserializer() != null) {
@@ -219,6 +219,6 @@ public class EitherSerializer<L, R> extends TypeSerializer<Either<L, R>> {
 			}
 		}
 
-		return CompatibilityResult.requiresMigration(null);
+		return CompatibilityResult.requiresMigration();
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/947c44e8/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
index 08da49e..a8368c4 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
@@ -580,13 +580,13 @@ public final class PojoSerializer<T> extends TypeSerializer<T> {
 							reorderedFields[i] = fieldToConfigSnapshotEntry.getKey();
 
 							compatResult = fieldSerializers[fieldIndex].ensureCompatibility(fieldToConfigSnapshotEntry.getValue());
-							if (compatResult.requiresMigration()) {
-								return CompatibilityResult.requiresMigration(null);
+							if (compatResult.isRequiresMigration()) {
+								return CompatibilityResult.requiresMigration();
 							} else {
 								reorderedFieldSerializers[i] = fieldSerializers[fieldIndex];
 							}
 						} else {
-							return CompatibilityResult.requiresMigration(null);
+							return CompatibilityResult.requiresMigration();
 						}
 
 						i++;
@@ -618,8 +618,8 @@ public final class PojoSerializer<T> extends TypeSerializer<T> {
 					for (TypeSerializerConfigSnapshot previousRegisteredSerializerConfig : previousRegistrations.values()) {
 						// check compatibility of subclass serializer
 						compatResult = reorderedRegisteredSubclassSerializers[i].ensureCompatibility(previousRegisteredSerializerConfig);
-						if (compatResult.requiresMigration()) {
-							return CompatibilityResult.requiresMigration(null);
+						if (compatResult.isRequiresMigration()) {
+							return CompatibilityResult.requiresMigration();
 						}
 
 						i++;
@@ -638,8 +638,8 @@ public final class PojoSerializer<T> extends TypeSerializer<T> {
 
 						// check compatibility of cached subclass serializer
 						compatResult = cachedSerializer.ensureCompatibility(previousCachedEntry.getValue());
-						if (compatResult.requiresMigration()) {
-							return CompatibilityResult.requiresMigration(null);
+						if (compatResult.isRequiresMigration()) {
+							return CompatibilityResult.requiresMigration();
 						} else {
 							rebuiltCache.put(previousCachedEntry.getKey(), cachedSerializer);
 						}
@@ -661,7 +661,7 @@ public final class PojoSerializer<T> extends TypeSerializer<T> {
 			}
 		}
 
-		return CompatibilityResult.requiresMigration(null);
+		return CompatibilityResult.requiresMigration();
 	}
 
 	public static final class PojoSerializerConfigSnapshot<T> extends GenericTypeSerializerConfigSnapshot<T> {

http://git-wip-us.apache.org/repos/asf/flink/blob/947c44e8/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowSerializer.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowSerializer.java
index 075c9d3..ba41d4b 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowSerializer.java
@@ -270,12 +270,12 @@ public final class RowSerializer extends TypeSerializer<Row> {
 				CompatibilityResult<?> compatResult;
 				for (int i = 0; i < fieldSerializers.length; i++) {
 					compatResult = fieldSerializers[i].ensureCompatibility(fieldSerializerConfigSnapshots[i]);
-					if (compatResult.requiresMigration()) {
+					if (compatResult.isRequiresMigration()) {
 						requireMigration = true;
 
 						if (compatResult.getConvertDeserializer() == null) {
 							// one of the field serializers cannot provide a fallback deserializer
-							return CompatibilityResult.requiresMigration(null);
+							return CompatibilityResult.requiresMigration();
 						} else {
 							convertDeserializers[i] =
 								new TypeDeserializerAdapter<>(compatResult.getConvertDeserializer());
@@ -291,7 +291,7 @@ public final class RowSerializer extends TypeSerializer<Row> {
 			}
 		}
 
-		return CompatibilityResult.requiresMigration(null);
+		return CompatibilityResult.requiresMigration();
 	}
 
 	public static final class RowSerializerConfigSnapshot extends CompositeTypeSerializerConfigSnapshot {

http://git-wip-us.apache.org/repos/asf/flink/blob/947c44e8/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java
index 68d5aa8..032c3f1 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java
@@ -145,8 +145,8 @@ public abstract class TupleSerializerBase<T> extends TypeSerializer<T> {
 					CompatibilityResult compatResult;
 					for (int i = 0; i < fieldSerializers.length; i++) {
 						compatResult = fieldSerializers[i].ensureCompatibility(fieldSerializerConfigSnapshots[i]);
-						if (compatResult.requiresMigration()) {
-							return CompatibilityResult.requiresMigration(null);
+						if (compatResult.isRequiresMigration()) {
+							return CompatibilityResult.requiresMigration();
 						}
 					}
 
@@ -155,6 +155,6 @@ public abstract class TupleSerializerBase<T> extends TypeSerializer<T> {
 			}
 		}
 
-		return CompatibilityResult.requiresMigration(null);
+		return CompatibilityResult.requiresMigration();
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/947c44e8/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/ValueSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/ValueSerializer.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/ValueSerializer.java
index 10e2330..0a028eb 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/ValueSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/ValueSerializer.java
@@ -193,7 +193,7 @@ public final class ValueSerializer<T extends Value> extends TypeSerializer<T> {
 			}
 		}
 
-		return CompatibilityResult.requiresMigration(null);
+		return CompatibilityResult.requiresMigration();
 	}
 
 	public static class ValueSerializerConfigSnapshot<T extends Value> extends KryoRegistrationSerializerConfigSnapshot<T> {

http://git-wip-us.apache.org/repos/asf/flink/blob/947c44e8/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java
index a172b72..655de76 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java
@@ -399,7 +399,7 @@ public class KryoSerializer<T> extends TypeSerializer<T> {
 							"proper serializer, because its previous serializer cannot be loaded or is no " +
 							"longer valid but a new serializer is not available", reconfiguredRegistrationEntry.getKey());
 
-						return CompatibilityResult.requiresMigration(null);
+						return CompatibilityResult.requiresMigration();
 					}
 				}
 
@@ -410,7 +410,7 @@ public class KryoSerializer<T> extends TypeSerializer<T> {
 			}
 		}
 
-		return CompatibilityResult.requiresMigration(null);
+		return CompatibilityResult.requiresMigration();
 	}
 
 	public static final class KryoSerializerConfigSnapshot<T> extends KryoRegistrationSerializerConfigSnapshot<T> {

http://git-wip-us.apache.org/repos/asf/flink/blob/947c44e8/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java
index f2879ac..73c4379 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java
@@ -121,11 +121,11 @@ public abstract class SerializerTestBase<T> extends TestLogger {
 		}
 
 		CompatibilityResult strategy = getSerializer().ensureCompatibility(restoredConfig);
-		assertFalse(strategy.requiresMigration());
+		assertFalse(strategy.isRequiresMigration());
 
 		// also verify that the serializer's reconfigure implementation detects incompatibility
 		strategy = getSerializer().ensureCompatibility(new TestIncompatibleSerializerConfigSnapshot());
-		assertTrue(strategy.requiresMigration());
+		assertTrue(strategy.isRequiresMigration());
 	}
 	
 	@Test

http://git-wip-us.apache.org/repos/asf/flink/blob/947c44e8/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/EnumSerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/EnumSerializerTest.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/EnumSerializerTest.java
index 5c615de..16ea945 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/EnumSerializerTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/EnumSerializerTest.java
@@ -74,7 +74,7 @@ public class EnumSerializerTest extends TestLogger {
 		// reconfigure and verify compatibility
 		CompatibilityResult<PublicEnum> compatResult = serializer.ensureCompatibility(
 			new EnumSerializer.EnumSerializerConfigSnapshot<>(PublicEnum.class, mockPreviousOrder));
-		assertFalse(compatResult.requiresMigration());
+		assertFalse(compatResult.isRequiresMigration());
 
 		// after reconfiguration, the order should be first the original BAR, PAULA, NATHANIEL,
 		// followed by the "new enum constants" FOO, PETER, EMMA
@@ -107,7 +107,7 @@ public class EnumSerializerTest extends TestLogger {
 		}
 
 		CompatibilityResult<PublicEnum> compatResult = serializer.ensureCompatibility(restoredConfig);
-		assertFalse(compatResult.requiresMigration());
+		assertFalse(compatResult.isRequiresMigration());
 
 		assertEquals(PublicEnum.FOO.ordinal(), serializer.getValueToOrdinal().get(PublicEnum.FOO).intValue());
 		assertEquals(PublicEnum.BAR.ordinal(), serializer.getValueToOrdinal().get(PublicEnum.BAR).intValue());
@@ -163,7 +163,7 @@ public class EnumSerializerTest extends TestLogger {
 		// reconfigure and verify compatibility
 		CompatibilityResult<PublicEnum> compatResult = serializer.ensureCompatibility(
 			new EnumSerializer.EnumSerializerConfigSnapshot<>(PublicEnum.class, mockPreviousOrder));
-		assertFalse(compatResult.requiresMigration());
+		assertFalse(compatResult.isRequiresMigration());
 
 		// serialize and deserialize again the serializer
 		byte[] serializedSerializer = InstantiationUtil.serializeObject(serializer);

http://git-wip-us.apache.org/repos/asf/flink/blob/947c44e8/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerTest.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerTest.java
index 5459d53..c77ffcc 100644
--- a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerTest.java
@@ -300,7 +300,7 @@ public class PojoSerializerTest extends SerializerTestBase<PojoSerializerTest.Te
 		}
 
 		CompatibilityResult<SubTestUserClassA> compatResult = pojoSerializer2.ensureCompatibility(pojoSerializerConfigSnapshot);
-		assertTrue(compatResult.requiresMigration());
+		assertTrue(compatResult.isRequiresMigration());
 	}
 
 	/**
@@ -340,7 +340,7 @@ public class PojoSerializerTest extends SerializerTestBase<PojoSerializerTest.Te
 		}
 
 		CompatibilityResult<TestUserClass> compatResult = pojoSerializer.ensureCompatibility(pojoSerializerConfigSnapshot);
-		assertTrue(!compatResult.requiresMigration());
+		assertTrue(!compatResult.isRequiresMigration());
 
 		// reconfigure - check reconfiguration result and that registration ids remains the same
 		//assertEquals(ReconfigureResult.COMPATIBLE, pojoSerializer.reconfigure(pojoSerializerConfigSnapshot));
@@ -384,7 +384,7 @@ public class PojoSerializerTest extends SerializerTestBase<PojoSerializerTest.Te
 
 		// reconfigure - check reconfiguration result and that subclass serializer cache is repopulated
 		CompatibilityResult<TestUserClass> compatResult = pojoSerializer.ensureCompatibility(pojoSerializerConfigSnapshot);
-		assertFalse(compatResult.requiresMigration());
+		assertFalse(compatResult.isRequiresMigration());
 		assertEquals(2, pojoSerializer.getSubclassSerializerCache().size());
 		assertTrue(pojoSerializer.getSubclassSerializerCache().containsKey(SubTestUserClassA.class));
 		assertTrue(pojoSerializer.getSubclassSerializerCache().containsKey(SubTestUserClassB.class));
@@ -446,7 +446,7 @@ public class PojoSerializerTest extends SerializerTestBase<PojoSerializerTest.Te
 		// 1) subclass serializer cache is repopulated
 		// 2) registrations also contain the now registered subclasses
 		CompatibilityResult<TestUserClass> compatResult = pojoSerializer.ensureCompatibility(pojoSerializerConfigSnapshot);
-		assertFalse(compatResult.requiresMigration());
+		assertFalse(compatResult.isRequiresMigration());
 		assertEquals(2, pojoSerializer.getSubclassSerializerCache().size());
 		assertTrue(pojoSerializer.getSubclassSerializerCache().containsKey(SubTestUserClassA.class));
 		assertTrue(pojoSerializer.getSubclassSerializerCache().containsKey(SubTestUserClassB.class));
@@ -501,7 +501,7 @@ public class PojoSerializerTest extends SerializerTestBase<PojoSerializerTest.Te
 		CompatibilityResult<TestUserClass> compatResult = pojoSerializer.ensureCompatibility(
 
 			mockPreviousConfigSnapshot);
-		assertFalse(compatResult.requiresMigration());
+		assertFalse(compatResult.isRequiresMigration());
 		int i = 0;
 		for (Field field : mockOriginalFieldOrder) {
 			assertEquals(field, pojoSerializer.getFields()[i]);

http://git-wip-us.apache.org/repos/asf/flink/blob/947c44e8/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializerCompatibilityTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializerCompatibilityTest.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializerCompatibilityTest.java
index 60c4dc4..860c560 100644
--- a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializerCompatibilityTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializerCompatibilityTest.java
@@ -66,7 +66,7 @@ public class KryoSerializerCompatibilityTest {
 		}
 
 		CompatibilityResult<TestClassB> compatResult = kryoSerializerForB.ensureCompatibility(kryoSerializerConfigSnapshot);
-		assertTrue(compatResult.requiresMigration());
+		assertTrue(compatResult.isRequiresMigration());
 	}
 
 	/**
@@ -110,7 +110,7 @@ public class KryoSerializerCompatibilityTest {
 
 		// reconfigure - check reconfiguration result and that registration id remains the same
 		CompatibilityResult<TestClass> compatResult = kryoSerializer.ensureCompatibility(kryoSerializerConfigSnapshot);
-		assertFalse(compatResult.requiresMigration());
+		assertFalse(compatResult.isRequiresMigration());
 		assertEquals(testClassId, kryoSerializer.getKryo().getRegistration(TestClass.class).getId());
 		assertEquals(testClassAId, kryoSerializer.getKryo().getRegistration(TestClassA.class).getId());
 		assertEquals(testClassBId, kryoSerializer.getKryo().getRegistration(TestClassB.class).getId());

http://git-wip-us.apache.org/repos/asf/flink/blob/947c44e8/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
index 140e091..3afe397 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
@@ -513,7 +513,7 @@ public abstract class AbstractKeyedCEPPatternOperator<IN, KEY, OUT>
 				CompatibilityResult<T> compatResult = elementSerializer.ensureCompatibility(
 						((CollectionSerializerConfigSnapshot) configSnapshot).getSingleNestedSerializerConfigSnapshot());
 
-				if (!compatResult.requiresMigration()) {
+				if (!compatResult.isRequiresMigration()) {
 					return CompatibilityResult.compatible();
 				} else if (compatResult.getConvertDeserializer() != null) {
 					return CompatibilityResult.requiresMigration(
@@ -522,7 +522,7 @@ public abstract class AbstractKeyedCEPPatternOperator<IN, KEY, OUT>
 				}
 			}
 
-			return CompatibilityResult.requiresMigration(null);
+			return CompatibilityResult.requiresMigration();
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/947c44e8/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/types/CRowSerializer.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/types/CRowSerializer.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/types/CRowSerializer.scala
index 122f4fb..0fd3680 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/types/CRowSerializer.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/types/CRowSerializer.scala
@@ -93,20 +93,20 @@ class CRowSerializer(val rowSerializer: TypeSerializer[Row]) extends TypeSeriali
         val compatResult = rowSerializer.ensureCompatibility(
             crowSerializerConfigSnapshot.getSingleNestedSerializerConfigSnapshot)
 
-        if (compatResult.requiresMigration()) {
+        if (compatResult.isRequiresMigration) {
           if (compatResult.getConvertDeserializer != null) {
             CompatibilityResult.requiresMigration(
               new CRowSerializer(
                 new TypeDeserializerAdapter(compatResult.getConvertDeserializer))
             )
           } else {
-            CompatibilityResult.requiresMigration(null)
+            CompatibilityResult.requiresMigration()
           }
         } else {
           CompatibilityResult.compatible()
         }
 
-      case _ => CompatibilityResult.requiresMigration(null)
+      case _ => CompatibilityResult.requiresMigration()
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/947c44e8/flink-runtime/src/main/java/org/apache/flink/runtime/state/ArrayListSerializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ArrayListSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ArrayListSerializer.java
index c39cb9b..56eb7ea 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ArrayListSerializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ArrayListSerializer.java
@@ -152,7 +152,7 @@ final public class ArrayListSerializer<T> extends TypeSerializer<ArrayList<T>> {
 			CompatibilityResult<T> compatResult = elementSerializer.ensureCompatibility(
 				((CollectionSerializerConfigSnapshot) configSnapshot).getSingleNestedSerializerConfigSnapshot());
 
-			if (!compatResult.requiresMigration()) {
+			if (!compatResult.isRequiresMigration()) {
 				return CompatibilityResult.compatible();
 			} else if (compatResult.getConvertDeserializer() != null) {
 				return CompatibilityResult.requiresMigration(
@@ -160,6 +160,6 @@ final public class ArrayListSerializer<T> extends TypeSerializer<ArrayList<T>> {
 			}
 		}
 
-		return CompatibilityResult.requiresMigration(null);
+		return CompatibilityResult.requiresMigration();
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/947c44e8/flink-runtime/src/main/java/org/apache/flink/runtime/state/HashMapSerializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/HashMapSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/HashMapSerializer.java
index 925fe78..b93c9e0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/HashMapSerializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/HashMapSerializer.java
@@ -217,7 +217,7 @@ public final class HashMapSerializer<K, V> extends TypeSerializer<HashMap<K, V>>
 			CompatibilityResult<K> keyCompatResult = keySerializer.ensureCompatibility(keyValueSerializerConfigSnapshots[0]);
 			CompatibilityResult<V> valueCompatResult = valueSerializer.ensureCompatibility(keyValueSerializerConfigSnapshots[1]);
 
-			if (!keyCompatResult.requiresMigration() && !valueCompatResult.requiresMigration()) {
+			if (!keyCompatResult.isRequiresMigration() && !valueCompatResult.isRequiresMigration()) {
 				return CompatibilityResult.compatible();
 			} else if (keyCompatResult.getConvertDeserializer() != null && valueCompatResult.getConvertDeserializer() != null) {
 				return CompatibilityResult.requiresMigration(
@@ -227,6 +227,6 @@ public final class HashMapSerializer<K, V> extends TypeSerializer<HashMap<K, V>>
 			}
 		}
 
-		return CompatibilityResult.requiresMigration(null);
+		return CompatibilityResult.requiresMigration();
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/947c44e8/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateMigrationUtil.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateMigrationUtil.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateMigrationUtil.java
index 978f28d..39bb743 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateMigrationUtil.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateMigrationUtil.java
@@ -48,7 +48,7 @@ public class StateMigrationUtil {
 	 *
 	 * @param <T> Type of the data handled by the serializers
 	 *
-	 * @return the final resolved compatiblity result
+	 * @return the final resolved compatibility result
 	 */
 	public static <T> CompatibilityResult<T> resolveCompatibilityResult(
 			TypeSerializer<T> precedingSerializer,
@@ -59,7 +59,7 @@ public class StateMigrationUtil {
 		if (precedingSerializerConfigSnapshot != null) {
 			CompatibilityResult<T> initialResult = newSerializer.ensureCompatibility(precedingSerializerConfigSnapshot);
 
-			if (!initialResult.requiresMigration()) {
+			if (!initialResult.isRequiresMigration()) {
 				return initialResult;
 			} else {
 				if (precedingSerializer != null && !(precedingSerializer.getClass().equals(dummySerializerClassTag))) {

http://git-wip-us.apache.org/repos/asf/flink/blob/947c44e8/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherSerializer.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherSerializer.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherSerializer.scala
index 88b2041..6c4378a 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherSerializer.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherSerializer.scala
@@ -127,7 +127,9 @@ class EitherSerializer[A, B, T <: Either[A, B]](
         val leftCompatResult = leftSerializer.ensureCompatibility(leftRightConfigs(0))
         val rightCompatResult = rightSerializer.ensureCompatibility(leftRightConfigs(1))
 
-        if (leftCompatResult.requiresMigration || rightCompatResult.requiresMigration) {
+        if (leftCompatResult.isRequiresMigration
+            || rightCompatResult.isRequiresMigration) {
+
           if (leftCompatResult.getConvertDeserializer != null
               && rightCompatResult.getConvertDeserializer != null) {
 
@@ -139,13 +141,13 @@ class EitherSerializer[A, B, T <: Either[A, B]](
             )
 
           } else {
-            CompatibilityResult.requiresMigration(null)
+            CompatibilityResult.requiresMigration()
           }
         } else {
           CompatibilityResult.compatible()
         }
 
-      case _ => CompatibilityResult.requiresMigration(null)
+      case _ => CompatibilityResult.requiresMigration()
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/947c44e8/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EnumValueSerializer.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EnumValueSerializer.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EnumValueSerializer.scala
index dc96c98..843079a 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EnumValueSerializer.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EnumValueSerializer.scala
@@ -95,16 +95,16 @@ class EnumValueSerializer[E <: Enumeration](val enum: E) extends TypeSerializer[
             // and original constants must be in the exact same order
 
             if (currentEnumConstants(i) != enumSerializerConfigSnapshot.getEnumConstants(i)) {
-              CompatibilityResult.requiresMigration(null)
+              return CompatibilityResult.requiresMigration()
             }
           }
 
           CompatibilityResult.compatible()
         } else {
-          CompatibilityResult.requiresMigration(null)
+          CompatibilityResult.requiresMigration()
         }
 
-      case _ => CompatibilityResult.requiresMigration(null)
+      case _ => CompatibilityResult.requiresMigration()
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/947c44e8/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionSerializer.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionSerializer.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionSerializer.scala
index 81b3bcc..4b56059 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionSerializer.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionSerializer.scala
@@ -111,19 +111,19 @@ class OptionSerializer[A](val elemSerializer: TypeSerializer[A])
         val compatResult = elemSerializer.ensureCompatibility(
           optionSerializerConfigSnapshot.getSingleNestedSerializerConfigSnapshot)
 
-        if (compatResult.requiresMigration()) {
+        if (compatResult.isRequiresMigration) {
           if (compatResult.getConvertDeserializer != null) {
             CompatibilityResult.requiresMigration(
               new OptionSerializer[A](
                 new TypeDeserializerAdapter(compatResult.getConvertDeserializer)))
           } else {
-            CompatibilityResult.requiresMigration(null)
+            CompatibilityResult.requiresMigration()
           }
         } else {
           CompatibilityResult.compatible()
         }
 
-      case _ => CompatibilityResult.requiresMigration(null)
+      case _ => CompatibilityResult.requiresMigration()
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/947c44e8/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TrySerializer.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TrySerializer.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TrySerializer.scala
index c864dc7..5de76ca 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TrySerializer.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TrySerializer.scala
@@ -123,13 +123,13 @@ class TrySerializer[A](
         val throwableCompatRes =
           throwableSerializer.ensureCompatibility(serializerConfigSnapshots(1))
 
-        if (elemCompatRes.requiresMigration() || throwableCompatRes.requiresMigration()) {
-          CompatibilityResult.requiresMigration(null)
+        if (elemCompatRes.isRequiresMigration || throwableCompatRes.isRequiresMigration) {
+          CompatibilityResult.requiresMigration()
         } else {
           CompatibilityResult.compatible()
         }
 
-      case _ => CompatibilityResult.requiresMigration(null)
+      case _ => CompatibilityResult.requiresMigration()
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/947c44e8/flink-streaming-java/src/main/java/org/apache/flink/migration/streaming/runtime/streamrecord/MultiplexingStreamRecordSerializer.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/migration/streaming/runtime/streamrecord/MultiplexingStreamRecordSerializer.java b/flink-streaming-java/src/main/java/org/apache/flink/migration/streaming/runtime/streamrecord/MultiplexingStreamRecordSerializer.java
index 552ffd0..dc23b8d 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/migration/streaming/runtime/streamrecord/MultiplexingStreamRecordSerializer.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/migration/streaming/runtime/streamrecord/MultiplexingStreamRecordSerializer.java
@@ -225,7 +225,7 @@ public class MultiplexingStreamRecordSerializer<T> extends TypeSerializer<Stream
 			CompatibilityResult<T> compatResult = typeSerializer.ensureCompatibility(
 				((MultiplexingStreamRecordSerializerConfigSnapshot) configSnapshot).getSingleNestedSerializerConfigSnapshot());
 
-			if (!compatResult.requiresMigration()) {
+			if (!compatResult.isRequiresMigration()) {
 				return CompatibilityResult.compatible();
 			} else if (compatResult.getConvertDeserializer() != null) {
 				return CompatibilityResult.requiresMigration(
@@ -234,7 +234,7 @@ public class MultiplexingStreamRecordSerializer<T> extends TypeSerializer<Stream
 			}
 		}
 
-		return CompatibilityResult.requiresMigration(null);
+		return CompatibilityResult.requiresMigration();
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/947c44e8/flink-streaming-java/src/main/java/org/apache/flink/migration/streaming/runtime/streamrecord/StreamRecordSerializer.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/migration/streaming/runtime/streamrecord/StreamRecordSerializer.java b/flink-streaming-java/src/main/java/org/apache/flink/migration/streaming/runtime/streamrecord/StreamRecordSerializer.java
index f7a661e..7b0390d 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/migration/streaming/runtime/streamrecord/StreamRecordSerializer.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/migration/streaming/runtime/streamrecord/StreamRecordSerializer.java
@@ -164,8 +164,8 @@ public final class StreamRecordSerializer<T> extends TypeSerializer<StreamRecord
 			CompatibilityResult<T> compatResult = typeSerializer.ensureCompatibility(
 				((StreamRecordSerializerConfigSnapshot) configSnapshot).getSingleNestedSerializerConfigSnapshot());
 
-			if (!compatResult.requiresMigration()) {
-				return CompatibilityResult.requiresMigration(null);
+			if (!compatResult.isRequiresMigration()) {
+				return CompatibilityResult.compatible();
 			} else if (compatResult.getConvertDeserializer() != null) {
 				return CompatibilityResult.requiresMigration(
 					new StreamRecordSerializer<>(
@@ -173,7 +173,7 @@ public final class StreamRecordSerializer<T> extends TypeSerializer<StreamRecord
 			}
 		}
 
-		return CompatibilityResult.requiresMigration(null);
+		return CompatibilityResult.requiresMigration();
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/947c44e8/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializer.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializer.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializer.java
index e444ced..ba69fed 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializer.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializer.java
@@ -286,7 +286,7 @@ public final class StreamElementSerializer<T> extends TypeSerializer<StreamEleme
 			CompatibilityResult<T> compatResult = typeSerializer.ensureCompatibility(
 				((StreamElementSerializerConfigSnapshot) configSnapshot).getSingleNestedSerializerConfigSnapshot());
 
-			if (!compatResult.requiresMigration()) {
+			if (!compatResult.isRequiresMigration()) {
 				return CompatibilityResult.compatible();
 			} else if (compatResult.getConvertDeserializer() != null) {
 				return CompatibilityResult.requiresMigration(
@@ -295,7 +295,7 @@ public final class StreamElementSerializer<T> extends TypeSerializer<StreamEleme
 			}
 		}
 
-		return CompatibilityResult.requiresMigration(null);
+		return CompatibilityResult.requiresMigration();
 	}
 
 	/**


[2/3] flink git commit: [FLINK-6566] [core] More restricted interface for VersionedIOReadableWritable hooks

Posted by tz...@apache.org.
[FLINK-6566] [core] More restricted interface for VersionedIOReadableWritable hooks

This commit makes the method hooks for defining compatibile
serialization versions of VersionedIOReadableWritables more restricted.

Functionally everything remains the same, but with lesser space for
error-prone user implementations. It also allows for a better error
message to indicate version mismatch.

This closes #3883.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/347100de
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/347100de
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/347100de

Branch: refs/heads/master
Commit: 347100de7527dc4ba3664b8e8306a081834f84a4
Parents: c594af0
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
Authored: Fri May 12 20:06:01 2017 +0800
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Sat May 13 14:45:25 2017 +0800

----------------------------------------------------------------------
 .../typeutils/TypeSerializerConfigSnapshot.java | 23 ---------
 .../core/io/VersionedIOReadableWritable.java    | 53 ++++++++++++--------
 .../flink/core/io/VersionedIOWriteableTest.java | 16 ++----
 .../state/KeyedBackendSerializationProxy.java   | 20 ++------
 .../OperatorBackendSerializationProxy.java      | 17 ++-----
 .../state/heap/HeapKeyedStateBackend.java       |  2 +-
 6 files changed, 42 insertions(+), 89 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/347100de/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerConfigSnapshot.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerConfigSnapshot.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerConfigSnapshot.java
index 27369b9..389d141 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerConfigSnapshot.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerConfigSnapshot.java
@@ -20,7 +20,6 @@ package org.apache.flink.api.common.typeutils;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.core.io.VersionMismatchException;
 import org.apache.flink.core.io.VersionedIOReadableWritable;
 import org.apache.flink.util.Preconditions;
 
@@ -51,22 +50,6 @@ public abstract class TypeSerializerConfigSnapshot extends VersionedIOReadableWr
 	/** The user code class loader; only relevant if this configuration instance was deserialized from binary form. */
 	private ClassLoader userCodeClassLoader;
 
-	/** The snapshot version of this configuration. */
-	private Integer snapshotVersion;
-
-	/**
-	 * Returns the version of the configuration at the time its snapshot was taken.
-	 *
-	 * @return the snapshot configuration's version.
-	 */
-	public int getSnapshotVersion() {
-		if (snapshotVersion == null) {
-			return getVersion();
-		} else {
-			return snapshotVersion;
-		}
-	}
-
 	/**
 	 * Set the user code class loader.
 	 * Only relevant if this configuration instance was deserialized from binary form.
@@ -91,12 +74,6 @@ public abstract class TypeSerializerConfigSnapshot extends VersionedIOReadableWr
 		return userCodeClassLoader;
 	}
 
-	@Override
-	protected void resolveVersionRead(int foundVersion) throws VersionMismatchException {
-		super.resolveVersionRead(foundVersion);
-		this.snapshotVersion = foundVersion;
-	}
-
 	public abstract boolean equals(Object obj);
 
 	public abstract int hashCode();

http://git-wip-us.apache.org/repos/asf/flink/blob/347100de/flink-core/src/main/java/org/apache/flink/core/io/VersionedIOReadableWritable.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/io/VersionedIOReadableWritable.java b/flink-core/src/main/java/org/apache/flink/core/io/VersionedIOReadableWritable.java
index bad9cef..b4a0b2f 100644
--- a/flink-core/src/main/java/org/apache/flink/core/io/VersionedIOReadableWritable.java
+++ b/flink-core/src/main/java/org/apache/flink/core/io/VersionedIOReadableWritable.java
@@ -18,20 +18,23 @@
 
 package org.apache.flink.core.io;
 
-import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 
 import java.io.IOException;
+import java.util.Arrays;
 
 /**
  * This is the abstract base class for {@link IOReadableWritable} which allows to differentiate between serialization
  * versions. Concrete subclasses should typically override the {@link #write(DataOutputView)} and
  * {@link #read(DataInputView)}, thereby calling super to ensure version checking.
  */
-@PublicEvolving
+@Internal
 public abstract class VersionedIOReadableWritable implements IOReadableWritable, Versioned {
 
+	private int readVersion = Integer.MIN_VALUE;
+
 	@Override
 	public void write(DataOutputView out) throws IOException {
 		out.writeInt(getVersion());
@@ -39,34 +42,42 @@ public abstract class VersionedIOReadableWritable implements IOReadableWritable,
 
 	@Override
 	public void read(DataInputView in) throws IOException {
-		int foundVersion = in.readInt();
-		resolveVersionRead(foundVersion);
+		this.readVersion = in.readInt();
+		resolveVersionRead(readVersion);
 	}
 
 	/**
-	 * This method is a hook to react on the version tag that we find during read. This can also be used to initialize
-	 * further read logic w.r.t. the version at hand.
-	 * Default implementation of this method just checks the compatibility of a version number against the own version.
+	 * Returns the found serialization version. If this instance was not read from serialized bytes
+	 * but simply instantiated, then the current version is returned.
 	 *
-	 * @param foundVersion the version found from reading the input stream
-	 * @throws VersionMismatchException thrown when serialization versions mismatch
+	 * @return the read serialization version, or the current version if the instance was not read from bytes.
 	 */
-	protected void resolveVersionRead(int foundVersion) throws VersionMismatchException {
-		if (!isCompatibleVersion(foundVersion)) {
-			int expectedVersion = getVersion();
-			throw new VersionMismatchException(
-					"Incompatible version: found " + foundVersion + ", required " + expectedVersion);
-		}
+	public int getReadVersion() {
+		return (readVersion == Integer.MIN_VALUE) ? getVersion() : readVersion;
 	}
 
 	/**
-	 * Checks for compatibility between this and the found version. Subclasses can override this methods in case of
-	 * intended backwards backwards compatibility.
+	 * Returns the compatible version values.
 	 *
-	 * @param version version number to compare against.
-	 * @return true, iff this is compatible to the passed version.
+	 * <p>By default, the base implementation recognizes only the current version (identified by {@link #getVersion()})
+	 * as compatible. This method can be used as a hook and may be overridden to identify more compatible versions.
+	 *
+	 * @return an array of integers representing the compatible version values.
 	 */
-	public boolean isCompatibleVersion(int version) {
-		return getVersion() == version;
+	public int[] getCompatibleVersions() {
+		return new int[] {getVersion()};
+	}
+
+	private void resolveVersionRead(int readVersion) throws VersionMismatchException {
+
+		int[] compatibleVersions = getCompatibleVersions();
+		for (int compatibleVersion : compatibleVersions) {
+			if (compatibleVersion == readVersion) {
+				return;
+			}
+		}
+
+		throw new VersionMismatchException(
+			"Incompatible version: found " + readVersion + ", compatible versions are " + Arrays.toString(compatibleVersions));
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/347100de/flink-core/src/test/java/org/apache/flink/core/io/VersionedIOWriteableTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/core/io/VersionedIOWriteableTest.java b/flink-core/src/test/java/org/apache/flink/core/io/VersionedIOWriteableTest.java
index b7b6d6f..ec5f792 100644
--- a/flink-core/src/test/java/org/apache/flink/core/io/VersionedIOWriteableTest.java
+++ b/flink-core/src/test/java/org/apache/flink/core/io/VersionedIOWriteableTest.java
@@ -65,8 +65,8 @@ public class VersionedIOWriteableTest {
 
 		testWriteable = new TestWriteable(2) {
 			@Override
-			public boolean isCompatibleVersion(int version) {
-				return getVersion() >= version;
+			public int[] getCompatibleVersions() {
+				return new int[] {1, 2};
 			}
 		};
 		try (ByteArrayInputStreamWithPos in = new ByteArrayInputStreamWithPos(serialized)) {
@@ -130,19 +130,9 @@ public class VersionedIOWriteableTest {
 			this.data = in.readUTF();
 		}
 
-		@Override
-		protected void resolveVersionRead(int foundVersion) throws VersionMismatchException {
-			super.resolveVersionRead(foundVersion);
-		}
-
-		@Override
-		public boolean isCompatibleVersion(int version) {
-			return super.isCompatibleVersion(version);
-		}
-
 		public String getData() {
 			return data;
 		}
 	}
 
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/347100de/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendSerializationProxy.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendSerializationProxy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendSerializationProxy.java
index a389c4f..a20628c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendSerializationProxy.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendSerializationProxy.java
@@ -20,7 +20,6 @@ package org.apache.flink.runtime.state;
 
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerSerializationProxy;
-import org.apache.flink.core.io.VersionMismatchException;
 import org.apache.flink.core.io.VersionedIOReadableWritable;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
@@ -41,7 +40,6 @@ public class KeyedBackendSerializationProxy extends VersionedIOReadableWritable
 	private TypeSerializer<?> keySerializer;
 	private List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> stateMetaInfoSnapshots;
 
-	private int restoredVersion;
 	private ClassLoader userCodeClassLoader;
 
 	public KeyedBackendSerializationProxy(ClassLoader userCodeClassLoader) {
@@ -57,8 +55,6 @@ public class KeyedBackendSerializationProxy extends VersionedIOReadableWritable
 		Preconditions.checkNotNull(stateMetaInfoSnapshots);
 		Preconditions.checkArgument(stateMetaInfoSnapshots.size() <= Short.MAX_VALUE);
 		this.stateMetaInfoSnapshots = stateMetaInfoSnapshots;
-
-		this.restoredVersion = VERSION;
 	}
 
 	public List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> getStateMetaInfoSnapshots() {
@@ -74,20 +70,10 @@ public class KeyedBackendSerializationProxy extends VersionedIOReadableWritable
 		return VERSION;
 	}
 
-	public int getRestoredVersion() {
-		return restoredVersion;
-	}
-
-	@Override
-	protected void resolveVersionRead(int foundVersion) throws VersionMismatchException {
-		super.resolveVersionRead(foundVersion);
-		this.restoredVersion = foundVersion;
-	}
-
 	@Override
-	public boolean isCompatibleVersion(int version) {
+	public int[] getCompatibleVersions() {
 		// we are compatible with version 3 (Flink 1.3.x) and version 1 & 2 (Flink 1.2.x)
-		return super.isCompatibleVersion(version) || version == 2 || version == 1;
+		return new int[] {VERSION, 2, 1};
 	}
 
 	@Override
@@ -119,7 +105,7 @@ public class KeyedBackendSerializationProxy extends VersionedIOReadableWritable
 		for (int i = 0; i < numKvStates; i++) {
 			stateMetaInfoSnapshots.add(
 				KeyedBackendStateMetaInfoSnapshotReaderWriters
-					.getReaderForVersion(restoredVersion, userCodeClassLoader)
+					.getReaderForVersion(getReadVersion(), userCodeClassLoader)
 					.readStateMetaInfo(in));
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/347100de/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendSerializationProxy.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendSerializationProxy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendSerializationProxy.java
index 91d7aab..074d84e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendSerializationProxy.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendSerializationProxy.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.runtime.state;
 
-import org.apache.flink.core.io.VersionMismatchException;
 import org.apache.flink.core.io.VersionedIOReadableWritable;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
@@ -39,8 +38,6 @@ public class OperatorBackendSerializationProxy extends VersionedIOReadableWritab
 	private List<RegisteredOperatorBackendStateMetaInfo.Snapshot<?>> stateMetaInfoSnapshots;
 	private ClassLoader userCodeClassLoader;
 
-	private int restoredVersion;
-
 	public OperatorBackendSerializationProxy(ClassLoader userCodeClassLoader) {
 		this.userCodeClassLoader = Preconditions.checkNotNull(userCodeClassLoader);
 	}
@@ -50,8 +47,6 @@ public class OperatorBackendSerializationProxy extends VersionedIOReadableWritab
 
 		this.stateMetaInfoSnapshots = Preconditions.checkNotNull(stateMetaInfoSnapshots);
 		Preconditions.checkArgument(stateMetaInfoSnapshots.size() <= Short.MAX_VALUE);
-
-		this.restoredVersion = VERSION;
 	}
 
 	@Override
@@ -60,15 +55,9 @@ public class OperatorBackendSerializationProxy extends VersionedIOReadableWritab
 	}
 
 	@Override
-	protected void resolveVersionRead(int foundVersion) throws VersionMismatchException {
-		super.resolveVersionRead(foundVersion);
-		this.restoredVersion = foundVersion;
-	}
-
-	@Override
-	public boolean isCompatibleVersion(int version) {
+	public int[] getCompatibleVersions() {
 		// we are compatible with version 2 (Flink 1.3.x) and version 1 (Flink 1.2.x)
-		return super.isCompatibleVersion(version) || version == 1;
+		return new int[] {VERSION, 1};
 	}
 
 	@Override
@@ -92,7 +81,7 @@ public class OperatorBackendSerializationProxy extends VersionedIOReadableWritab
 		for (int i = 0; i < numKvStates; i++) {
 			stateMetaInfoSnapshots.add(
 				OperatorBackendStateMetaInfoSnapshotReaderWriters
-					.getReaderForVersion(restoredVersion, userCodeClassLoader)
+					.getReaderForVersion(getReadVersion(), userCodeClassLoader)
 					.readStateMetaInfo(in));
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/347100de/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
index bc314df..11e7760 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
@@ -448,7 +448,7 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 						StateTableByKeyGroupReader keyGroupReader =
 								StateTableByKeyGroupReaders.readerForVersion(
 										stateTable,
-										serializationProxy.getRestoredVersion());
+										serializationProxy.getReadVersion());
 
 						keyGroupReader.readMappingsInKeyGroup(inView, keyGroupIndex);
 					}