You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2017/06/13 05:17:29 UTC

[10/15] flink git commit: [FLINK-6803] [tests] Add test for PojoSerializer state upgrade

[FLINK-6803] [tests] Add test for PojoSerializer state upgrade

The added PojoSerializerUpgradeTest tests the state migration behaviour when the
underlying pojo type changes and one tries to recover from old state. Currently
not all tests could be activated, because there still some pending issues to be
fixed first. We should arm these tests once the issues have been fixed.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2d34af34
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2d34af34
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2d34af34

Branch: refs/heads/master
Commit: 2d34af345e241b23489b6bbdd2a752243c1e44fd
Parents: e35c575
Author: Till Rohrmann <tr...@apache.org>
Authored: Wed May 31 18:37:12 2017 +0200
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Tue Jun 13 06:38:17 2017 +0200

----------------------------------------------------------------------
 .../state/RocksDBKeyedStateBackend.java         |   5 +-
 .../state/RocksDBStateBackendTest.java          |   2 +-
 .../api/common/typeutils/CompatibilityUtil.java |   6 +-
 .../java/typeutils/runtime/FieldSerializer.java |   2 +-
 .../flink/util/StateMigrationException.java     |  38 ++
 .../state/heap/HeapKeyedStateBackend.java       |   3 +-
 .../runtime/state/StateBackendTestBase.java     | 133 ++----
 flink-streaming-java/pom.xml                    |   1 -
 .../util/AbstractStreamOperatorTestHarness.java |   1 -
 flink-tests/pom.xml                             |   8 +
 .../PojoSerializerUpgradeTest.java              | 445 +++++++++++++++++++
 11 files changed, 549 insertions(+), 95 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/2d34af34/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index 241c0b3..758e894 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -79,6 +79,7 @@ import org.apache.flink.util.FileUtils;
 import org.apache.flink.util.IOUtils;
 import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.StateMigrationException;
 
 import org.rocksdb.Checkpoint;
 import org.rocksdb.ColumnFamilyDescriptor;
@@ -1506,7 +1507,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 	 */
 	@SuppressWarnings("rawtypes, unchecked")
 	protected <N, S> ColumnFamilyHandle getColumnFamily(
-			StateDescriptor<?, S> descriptor, TypeSerializer<N> namespaceSerializer) throws IOException {
+			StateDescriptor<?, S> descriptor, TypeSerializer<N> namespaceSerializer) throws IOException, StateMigrationException {
 
 		Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>> stateInfo =
 				kvStateInformation.get(descriptor.getName());
@@ -1557,7 +1558,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 				return stateInfo.f0;
 			} else {
 				// TODO state migration currently isn't possible.
-				throw new RuntimeException("State migration currently isn't supported.");
+				throw new StateMigrationException("State migration isn't supported, yet.");
 			}
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/2d34af34/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
index 8b44a47..cf363fa 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
@@ -96,7 +96,7 @@ public class RocksDBStateBackendTest extends StateBackendTestBase<RocksDBStateBa
 	private ValueState<Integer> testState1;
 	private ValueState<String> testState2;
 
-	@Parameterized.Parameters
+	@Parameterized.Parameters(name = "Incremental checkpointing: {0}")
 	public static Collection<Boolean> parameters() {
 		return Arrays.asList(false, true);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/2d34af34/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompatibilityUtil.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompatibilityUtil.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompatibilityUtil.java
index 8959628..94bb9bd 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompatibilityUtil.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompatibilityUtil.java
@@ -19,6 +19,8 @@ package org.apache.flink.api.common.typeutils;
 
 import org.apache.flink.annotation.Internal;
 
+import javax.annotation.Nullable;
+
 /**
  * Utilities related to serializer compatibility.
  */
@@ -40,7 +42,7 @@ public class CompatibilityUtil {
 	 *      If yes, use that for state migration and simply return the result.
 	 *   6. If all of above fails, state migration is required but could not be performed; throw exception.
 	 *
-	 * @param precedingSerializer the preceding serializer used to write the data
+	 * @param precedingSerializer the preceding serializer used to write the data, null if none could be retrieved
 	 * @param dummySerializerClassTag any class tags that identifies the preceding serializer as a dummy placeholder
 	 * @param precedingSerializerConfigSnapshot configuration snapshot of the preceding serializer
 	 * @param newSerializer the new serializer to ensure compatibility with
@@ -51,7 +53,7 @@ public class CompatibilityUtil {
 	 */
 	@SuppressWarnings("unchecked")
 	public static <T> CompatibilityResult<T> resolveCompatibilityResult(
-			TypeSerializer<?> precedingSerializer,
+			@Nullable TypeSerializer<?> precedingSerializer,
 			Class<?> dummySerializerClassTag,
 			TypeSerializerConfigSnapshot precedingSerializerConfigSnapshot,
 			TypeSerializer<T> newSerializer) {

http://git-wip-us.apache.org/repos/asf/flink/blob/2d34af34/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/FieldSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/FieldSerializer.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/FieldSerializer.java
index 5519889..56a4445 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/FieldSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/FieldSerializer.java
@@ -51,7 +51,7 @@ public class FieldSerializer {
 				clazz = clazz.getSuperclass();
 			}
 		}
-		throw new RuntimeException("Class resolved at TaskManager is not compatible with class read during Plan setup."
+		throw new IOException("Class resolved at TaskManager is not compatible with class read during Plan setup."
 				+ " (" + fieldName + ")");
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/2d34af34/flink-core/src/main/java/org/apache/flink/util/StateMigrationException.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/StateMigrationException.java b/flink-core/src/main/java/org/apache/flink/util/StateMigrationException.java
new file mode 100644
index 0000000..1667ff5
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/util/StateMigrationException.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.util;
+
+/**
+ * Base class for state migration related exceptions
+ */
+public class StateMigrationException extends FlinkException {
+	private static final long serialVersionUID = 8268516412747670839L;
+
+	public StateMigrationException(String message) {
+		super(message);
+	}
+
+	public StateMigrationException(Throwable cause) {
+		super(cause);
+	}
+
+	public StateMigrationException(String message, Throwable cause) {
+		super(message, cause);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2d34af34/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
index d4ba204..2ab9691 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
@@ -64,6 +64,7 @@ import org.apache.flink.runtime.state.internal.InternalReducingState;
 import org.apache.flink.runtime.state.internal.InternalValueState;
 import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.StateMigrationException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -399,7 +400,7 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 						.isRequiresMigration()) {
 
 						// TODO replace with state migration; note that key hash codes need to remain the same after migration
-						throw new IllegalStateException("The new key serializer is not compatible to read previous keys. " +
+						throw new StateMigrationException("The new key serializer is not compatible to read previous keys. " +
 							"Aborting now since state migration is currently not available");
 					}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/2d34af34/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
index 8d4a38e..f08ad2d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
@@ -68,6 +68,7 @@ import org.apache.flink.runtime.util.BlockerCheckpointStreamFactory;
 import org.apache.flink.types.IntValue;
 import org.apache.flink.util.FutureUtil;
 import org.apache.flink.util.IOUtils;
+import org.apache.flink.util.StateMigrationException;
 import org.apache.flink.util.TestLogger;
 import org.junit.Assert;
 import org.junit.Rule;
@@ -1804,53 +1805,44 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 	}
 
 	@Test
-	public void testRestoreWithWrongKeySerializer() {
-		try {
-			CheckpointStreamFactory streamFactory = createStreamFactory();
+	public void testRestoreWithWrongKeySerializer() throws Exception {
+		CheckpointStreamFactory streamFactory = createStreamFactory();
 
-			// use an IntSerializer at first
-			AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE);
+		// use an IntSerializer at first
+		AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE);
 
-			ValueStateDescriptor<String> kvId = new ValueStateDescriptor<>("id", String.class);
+		ValueStateDescriptor<String> kvId = new ValueStateDescriptor<>("id", String.class);
 
-			ValueState<String> state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
+		ValueState<String> state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
 
-			// write some state
-			backend.setCurrentKey(1);
-			state.update("1");
-			backend.setCurrentKey(2);
-			state.update("2");
+		// write some state
+		backend.setCurrentKey(1);
+		state.update("1");
+		backend.setCurrentKey(2);
+		state.update("2");
 
-			// draw a snapshot
-			KeyedStateHandle snapshot1 = FutureUtil.runIfNotDoneAndGet(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint()));
+		// draw a snapshot
+		KeyedStateHandle snapshot1 = FutureUtil.runIfNotDoneAndGet(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint()));
 
-			backend.dispose();
+		backend.dispose();
 
-			// restore with the wrong key serializer
-			try {
-				restoreKeyedBackend(DoubleSerializer.INSTANCE, snapshot1);
+		// restore with the wrong key serializer
+		try {
+			restoreKeyedBackend(DoubleSerializer.INSTANCE, snapshot1);
 
-				fail("should recognize wrong key serializer");
-			} catch (RuntimeException e) {
-				if (!e.getMessage().contains("The new key serializer is not compatible")) {
-					fail("wrong exception " + e);
-				}
-				// expected
-			}
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
+			fail("should recognize wrong key serializer");
+		} catch (StateMigrationException ignored) {
+			// expected
 		}
 	}
 
 	@Test
 	@SuppressWarnings("unchecked")
-	public void testValueStateRestoreWithWrongSerializers() {
-		try {
-			CheckpointStreamFactory streamFactory = createStreamFactory();
-			AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE);
+	public void testValueStateRestoreWithWrongSerializers() throws Exception {
+		CheckpointStreamFactory streamFactory = createStreamFactory();
+		AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE);
 
+		try {
 			ValueStateDescriptor<String> kvId = new ValueStateDescriptor<>("id", String.class);
 
 			ValueState<String> state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
@@ -1880,29 +1872,21 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 				state.value();
 
 				fail("should recognize wrong serializers");
-			} catch (RuntimeException e) {
-				if (!e.getMessage().contains("State migration currently isn't supported")) {
-					fail("wrong exception " + e);
-				}
+			} catch (StateMigrationException ignored) {
 				// expected
-			} catch (Exception e) {
-				fail("wrong exception " + e);
 			}
+		} finally {
 			backend.dispose();
 		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
 	}
 
 	@Test
 	@SuppressWarnings("unchecked")
-	public void testListStateRestoreWithWrongSerializers() {
-		try {
-			CheckpointStreamFactory streamFactory = createStreamFactory();
-			AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE);
+	public void testListStateRestoreWithWrongSerializers() throws Exception {
+		CheckpointStreamFactory streamFactory = createStreamFactory();
+		AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE);
 
+		try {
 			ListStateDescriptor<String> kvId = new ListStateDescriptor<>("id", String.class);
 			ListState<String> state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
 
@@ -1931,29 +1915,21 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 				state.get();
 
 				fail("should recognize wrong serializers");
-			} catch (RuntimeException e) {
-				if (!e.getMessage().contains("State migration currently isn't supported")) {
-					fail("wrong exception " + e);
-				}
+			} catch (StateMigrationException ignored) {
 				// expected
-			} catch (Exception e) {
-				fail("wrong exception " + e);
 			}
+		} finally {
 			backend.dispose();
 		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
 	}
 
 	@Test
 	@SuppressWarnings("unchecked")
-	public void testReducingStateRestoreWithWrongSerializers() {
-		try {
-			CheckpointStreamFactory streamFactory = createStreamFactory();
-			AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE);
+	public void testReducingStateRestoreWithWrongSerializers() throws Exception {
+		CheckpointStreamFactory streamFactory = createStreamFactory();
+		AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE);
 
+		try {
 			ReducingStateDescriptor<String> kvId = new ReducingStateDescriptor<>("id",
 					new AppendingReduce(),
 					StringSerializer.INSTANCE);
@@ -1984,29 +1960,21 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 				state.get();
 
 				fail("should recognize wrong serializers");
-			} catch (RuntimeException e) {
-				if (!e.getMessage().contains("State migration currently isn't supported")) {
-					fail("wrong exception " + e);
-				}
+			} catch (StateMigrationException ignored) {
 				// expected
-			} catch (Exception e) {
-				fail("wrong exception " + e);
 			}
+		} finally {
 			backend.dispose();
 		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
 	}
 
 	@Test
 	@SuppressWarnings("unchecked")
-	public void testMapStateRestoreWithWrongSerializers() {
-		try {
-			CheckpointStreamFactory streamFactory = createStreamFactory();
-			AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE);
+	public void testMapStateRestoreWithWrongSerializers() throws Exception {
+		CheckpointStreamFactory streamFactory = createStreamFactory();
+		AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE);
 
+		try {
 			MapStateDescriptor<String, String> kvId = new MapStateDescriptor<>("id", StringSerializer.INSTANCE, StringSerializer.INSTANCE);
 			MapState<String, String> state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
 
@@ -2025,7 +1993,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 
 			@SuppressWarnings("unchecked")
 			TypeSerializer<String> fakeStringSerializer =
-					(TypeSerializer<String>) (TypeSerializer<?>) FloatSerializer.INSTANCE;
+				(TypeSerializer<String>) (TypeSerializer<?>) FloatSerializer.INSTANCE;
 
 			try {
 				kvId = new MapStateDescriptor<>("id", fakeStringSerializer, StringSerializer.INSTANCE);
@@ -2035,19 +2003,12 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 				state.entries();
 
 				fail("should recognize wrong serializers");
-			} catch (RuntimeException e) {
-				if (!e.getMessage().contains("State migration currently isn't supported")) {
-					fail("wrong exception " + e);
-				}
+			} catch (StateMigrationException ignored) {
 				// expected
-			} catch (Exception e) {
-				fail("wrong exception " + e);
 			}
 			backend.dispose();
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
+		} finally {
+			backend.dispose();
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/2d34af34/flink-streaming-java/pom.xml
----------------------------------------------------------------------
diff --git a/flink-streaming-java/pom.xml b/flink-streaming-java/pom.xml
index ab972a9..6976519 100644
--- a/flink-streaming-java/pom.xml
+++ b/flink-streaming-java/pom.xml
@@ -90,7 +90,6 @@ under the License.
 			<scope>test</scope>
 			<type>test-jar</type>
 		</dependency>
-
 	</dependencies>
 
 	<build>

http://git-wip-us.apache.org/repos/asf/flink/blob/2d34af34/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
index 6f2d349..fd781f6 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
@@ -37,7 +37,6 @@ import org.apache.flink.runtime.checkpoint.StateAssignmentOperation;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.operators.testutils.MockEnvironment;
 import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
-import org.apache.flink.runtime.state.AbstractStateBackend;
 import org.apache.flink.runtime.state.CheckpointStreamFactory;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyGroupsStateHandle;

http://git-wip-us.apache.org/repos/asf/flink/blob/2d34af34/flink-tests/pom.xml
----------------------------------------------------------------------
diff --git a/flink-tests/pom.xml b/flink-tests/pom.xml
index dd6e949..4caf8a6 100644
--- a/flink-tests/pom.xml
+++ b/flink-tests/pom.xml
@@ -54,6 +54,14 @@ under the License.
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope>
+			<type>test-jar</type>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
 			<artifactId>flink-optimizer_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 			<scope>test</scope>

http://git-wip-us.apache.org/repos/asf/flink/blob/2d34af34/flink-tests/src/test/java/org/apache/flink/test/typeserializerupgrade/PojoSerializerUpgradeTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/typeserializerupgrade/PojoSerializerUpgradeTest.java b/flink-tests/src/test/java/org/apache/flink/test/typeserializerupgrade/PojoSerializerUpgradeTest.java
new file mode 100644
index 0000000..2769c50
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/typeserializerupgrade/PojoSerializerUpgradeTest.java
@@ -0,0 +1,445 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.typeserializerupgrade;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ReducingState;
+import org.apache.flink.api.common.state.ReducingStateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
+import org.apache.flink.core.testutils.CommonTestUtils;
+import org.apache.flink.runtime.operators.testutils.MockEnvironment;
+import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
+import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.StreamMap;
+import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
+import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
+import org.apache.flink.util.DynamicCodeLoadingException;
+import org.apache.flink.util.StateMigrationException;
+import org.apache.flink.util.TestLogger;
+import org.junit.ClassRule;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import javax.tools.JavaCompiler;
+import javax.tools.ToolProvider;
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Iterator;
+
+import static org.apache.flink.runtime.state.filesystem.FsStateBackendFactory.CHECKPOINT_DIRECTORY_URI_CONF_KEY;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.junit.Assume.assumeTrue;
+
+@RunWith(Parameterized.class)
+public class PojoSerializerUpgradeTest extends TestLogger {
+
+	@Parameterized.Parameters(name = "StateBackend: {0}")
+	public static Collection<String> parameters () {
+		return Arrays.asList(
+			AbstractStateBackend.MEMORY_STATE_BACKEND_NAME,
+			AbstractStateBackend.FS_STATE_BACKEND_NAME,
+			AbstractStateBackend.ROCKSDB_STATE_BACKEND_NAME);
+	}
+
+	@ClassRule
+	public static TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+	private StateBackend stateBackend;
+
+	public PojoSerializerUpgradeTest(String backendType) throws IOException, DynamicCodeLoadingException {
+		Configuration config = new Configuration();
+		config.setString(CoreOptions.STATE_BACKEND, backendType);
+		config.setString(CHECKPOINT_DIRECTORY_URI_CONF_KEY, temporaryFolder.newFolder().toURI().toString());
+		stateBackend = AbstractStateBackend.loadStateBackendFromConfig(config, Thread.currentThread().getContextClassLoader(), null);
+	}
+
+	private static final String POJO_NAME = "Pojo";
+
+	private static final String SOURCE_A =
+		"import java.util.Objects;" +
+		"public class Pojo { " +
+		"private long a; " +
+		"private String b; " +
+		"public long getA() { return a;} " +
+		"public void setA(long value) { a = value; }" +
+		"public String getB() { return b; }" +
+		"public void setB(String value) { b = value; }" +
+		"@Override public boolean equals(Object obj) { if (obj instanceof Pojo) { Pojo other = (Pojo) obj; return a == other.a && b.equals(other.b);} else { return false; }}" +
+		"@Override public int hashCode() { return Objects.hash(a, b); } " +
+		"@Override public String toString() {return \"(\" + a + \", \" + b + \")\";}}";
+
+	// changed order of fields which should be recoverable
+	private static final String SOURCE_B =
+		"import java.util.Objects;" +
+		"public class Pojo { " +
+		"private String b; " +
+		"private long a; " +
+		"public long getA() { return a;} " +
+		"public void setA(long value) { a = value; }" +
+		"public String getB() { return b; }" +
+		"public void setB(String value) { b = value; }" +
+		"@Override public boolean equals(Object obj) { if (obj instanceof Pojo) { Pojo other = (Pojo) obj; return a == other.a && b.equals(other.b);} else { return false; }}" +
+		"@Override public int hashCode() { return Objects.hash(a, b); } " +
+		"@Override public String toString() {return \"(\" + a + \", \" + b + \")\";}}";
+
+	// changed type of a field which should not be recoverable
+	private static final String SOURCE_C =
+		"import java.util.Objects;" +
+		"public class Pojo { " +
+		"private double a; " +
+		"private String b; " +
+		"public double getA() { return a;} " +
+		"public void setA(double value) { a = value; }" +
+		"public String getB() { return b; }" +
+		"public void setB(String value) { b = value; }" +
+		"@Override public boolean equals(Object obj) { if (obj instanceof Pojo) { Pojo other = (Pojo) obj; return a == other.a && b.equals(other.b);} else { return false; }}" +
+		"@Override public int hashCode() { return Objects.hash(a, b); } " +
+		"@Override public String toString() {return \"(\" + a + \", \" + b + \")\";}}";
+
+	// additional field which should not be recoverable
+	private static final String SOURCE_D =
+		"import java.util.Objects;" +
+		"public class Pojo { " +
+		"private long a; " +
+		"private String b; " +
+		"private double c; " +
+		"public long getA() { return a;} " +
+		"public void setA(long value) { a = value; }" +
+		"public String getB() { return b; }" +
+		"public void setB(String value) { b = value; }" +
+		"public double getC() { return c; } " +
+		"public void setC(double value) { c = value; }" +
+		"@Override public boolean equals(Object obj) { if (obj instanceof Pojo) { Pojo other = (Pojo) obj; return a == other.a && b.equals(other.b) && c == other.c;} else { return false; }}" +
+		"@Override public int hashCode() { return Objects.hash(a, b, c); } " +
+		"@Override public String toString() {return \"(\" + a + \", \" + b + \", \" + c + \")\";}}";
+
+	// missing field which should not be recoverable
+	private static final String SOURCE_E =
+		"import java.util.Objects;" +
+		"public class Pojo { " +
+		"private long a; " +
+		"public long getA() { return a;} " +
+		"public void setA(long value) { a = value; }" +
+		"@Override public boolean equals(Object obj) { if (obj instanceof Pojo) { Pojo other = (Pojo) obj; return a == other.a;} else { return false; }}" +
+		"@Override public int hashCode() { return Objects.hash(a); } " +
+		"@Override public String toString() {return \"(\" + a + \")\";}}";
+
+	/**
+	 * We should be able to handle a changed field order
+	 */
+	@Test
+	public void testChangedFieldOrder() throws Exception {
+		testPojoSerializerUpgrade(SOURCE_A, SOURCE_B);
+	}
+
+	/**
+	 * Changing field types should require a state migration
+	 */
+	@Test
+	public void testChangedFieldTypes() throws Exception {
+		assumeTrue("Running only for RocksDBStateBackend until FLINK-6804 has been fixed.", stateBackend instanceof RocksDBStateBackend);
+		try {
+			testPojoSerializerUpgrade(SOURCE_A, SOURCE_C);
+			fail("Expected a state migration exception.");
+		} catch (Exception e) {
+			if (CommonTestUtils.containsCause(e, StateMigrationException.class)) {
+				// StateMigrationException expected
+			} else {
+				throw e;
+			}
+		}
+	}
+
+	/**
+	 * Adding fields should require a state migration
+	 */
+	@Test
+	public void testAdditionalField() throws Exception {
+		assumeTrue("Running only for RocksDBStateBackend until FLINK-6804 has been fixed.", stateBackend instanceof RocksDBStateBackend);
+		try {
+			testPojoSerializerUpgrade(SOURCE_A, SOURCE_D);
+			fail("Expected a state migration exception.");
+		} catch (Exception e) {
+			if (CommonTestUtils.containsCause(e, StateMigrationException.class)) {
+				// StateMigrationException expected
+			} else {
+				throw e;
+			}
+		}
+	}
+
+	/**
+	 * Removing fields should require a state migration
+	 */
+	@Ignore("Ignore this test until FLINK-6801 has been fixed.")
+	@Test
+	public void testMissingField() throws Exception {
+		try {
+			testPojoSerializerUpgrade(SOURCE_A, SOURCE_E);
+			fail("Expected a state migration exception.");
+		} catch (Exception e) {
+			if (CommonTestUtils.containsCause(e, StateMigrationException.class)) {
+				// StateMigrationException expected
+			} else {
+				throw e;
+			}
+		}
+	}
+
+	public void testPojoSerializerUpgrade(String classSourceA, String classSourceB) throws Exception {
+		final Configuration taskConfiguration = new Configuration();
+		final ExecutionConfig executionConfig = new ExecutionConfig();
+		final KeySelector<Long, Long> keySelector = new IdentityKeySelector<>();
+		final Collection<Long> inputs = Arrays.asList(1L, 2L, 45L, 67L, 1337L);
+
+		// run the program with classSourceA
+		File rootPath = temporaryFolder.newFolder();
+		File sourceFile = writeSourceFile(rootPath, POJO_NAME + ".java", classSourceA);
+		compileClass(sourceFile);
+
+		final ClassLoader classLoader = URLClassLoader.newInstance(
+			new URL[]{rootPath.toURI().toURL()},
+			Thread.currentThread().getContextClassLoader());
+
+		OperatorStateHandles stateHandles = runOperator(
+			taskConfiguration,
+			executionConfig,
+			new StreamMap<>(new StatefulMapper(true, false)),
+			keySelector,
+			stateBackend,
+			classLoader,
+			null,
+			inputs);
+
+		// run the program with classSourceB
+		rootPath = temporaryFolder.newFolder();
+
+		sourceFile = writeSourceFile(rootPath, POJO_NAME + ".java", classSourceB);
+		compileClass(sourceFile);
+
+		final ClassLoader classLoaderB = URLClassLoader.newInstance(
+			new URL[]{rootPath.toURI().toURL()},
+			Thread.currentThread().getContextClassLoader());
+
+		runOperator(
+			taskConfiguration,
+			executionConfig,
+			new StreamMap<>(new StatefulMapper(true, true)),
+			keySelector,
+			stateBackend,
+			classLoaderB,
+			stateHandles,
+			inputs);
+	}
+
+	private OperatorStateHandles runOperator(
+			Configuration taskConfiguration,
+			ExecutionConfig executionConfig,
+			OneInputStreamOperator<Long, Long> operator,
+			KeySelector<Long, Long> keySelector,
+			StateBackend stateBackend,
+			ClassLoader classLoader,
+			OperatorStateHandles operatorStateHandles,
+			Iterable<Long> input) throws Exception {
+
+		final MockEnvironment environment = new MockEnvironment(
+			"test task",
+			32 * 1024,
+			new MockInputSplitProvider(),
+			256,
+			taskConfiguration,
+			executionConfig,
+			16,
+			1,
+			0,
+			classLoader);
+
+		final KeyedOneInputStreamOperatorTestHarness<Long, Long, Long> harness = new KeyedOneInputStreamOperatorTestHarness<>(
+			operator,
+			keySelector,
+			BasicTypeInfo.LONG_TYPE_INFO,
+			environment);
+
+		harness.setStateBackend(stateBackend);
+
+		harness.setup();
+		harness.initializeState(operatorStateHandles);
+		harness.open();
+
+		long timestamp = 0L;
+
+		for (Long value : input) {
+			harness.processElement(value, timestamp++);
+		}
+
+
+		long checkpointId = 1L;
+		long checkpointTimestamp = timestamp + 1L;
+
+		OperatorStateHandles stateHandles = harness.snapshot(checkpointId, checkpointTimestamp);
+
+		harness.close();
+
+		return stateHandles;
+	}
+
+	private static File writeSourceFile(File root, String name, String source) throws IOException {
+		File sourceFile = new File(root, name);
+
+		sourceFile.getParentFile().mkdirs();
+
+		try (FileWriter writer = new FileWriter(sourceFile)) {
+			writer.write(source);
+		}
+
+		return sourceFile;
+	}
+
+	private static int compileClass(File sourceFile) {
+		JavaCompiler compiler = ToolProvider.getSystemJavaCompiler();
+		return compiler.run(null, null, null, sourceFile.getPath());
+	}
+
+	private static final class StatefulMapper extends RichMapFunction<Long, Long> implements CheckpointedFunction {
+
+		private static final long serialVersionUID = -520490739059396832L;
+
+		private final boolean keyed;
+		private final boolean verify;
+
+		private transient ValueState<Object> valueState;
+		private transient MapState<Object, Object> mapState;
+		private transient ListState<Object> listState;
+		private transient ReducingState<Object> reducingState;
+		private transient Class<?> pojoClass;
+		private transient Field fieldA;
+		private transient Field fieldB;
+
+		public StatefulMapper(boolean keyed, boolean verify) {
+			this.keyed = keyed;
+			this.verify = verify;
+		}
+
+		@Override
+		public Long map(Long value) throws Exception {
+			Object pojo = pojoClass.newInstance();
+
+			fieldA.set(pojo, value);
+			fieldB.set(pojo, value + "");
+
+			if (verify) {
+				assertEquals(pojo, valueState.value());
+
+				assertTrue(mapState.contains(pojo));
+				assertEquals(pojo, mapState.get(pojo));
+
+				Iterator<Object> listIterator = listState.get().iterator();
+
+				boolean elementFound = false;
+
+				while(listIterator.hasNext()) {
+					elementFound |= pojo.equals(listIterator.next());
+				}
+
+				assertTrue(elementFound);
+
+				assertEquals(pojo, reducingState.get());
+			} else {
+				valueState.update(pojo);
+				mapState.put(pojo, pojo);
+				listState.add(pojo);
+				reducingState.add(pojo);
+			}
+
+			return value;
+		}
+
+		@Override
+		public void snapshotState(FunctionSnapshotContext context) throws Exception {
+
+		}
+
+		@Override
+		public void initializeState(FunctionInitializationContext context) throws Exception {
+			pojoClass = getRuntimeContext().getUserCodeClassLoader().loadClass(POJO_NAME);
+
+			fieldA = pojoClass.getDeclaredField("a");
+			fieldB = pojoClass.getDeclaredField("b");
+			fieldA.setAccessible(true);
+			fieldB.setAccessible(true);
+
+			if (keyed) {
+				valueState = context.getKeyedStateStore().getState(new ValueStateDescriptor<>("valueState", (Class<Object>) pojoClass));
+				mapState = context.getKeyedStateStore().getMapState(new MapStateDescriptor<>("mapState", (Class<Object>) pojoClass, (Class<Object>) pojoClass));
+				listState = context.getKeyedStateStore().getListState(new ListStateDescriptor<>("listState", (Class<Object>) pojoClass));
+
+				ReduceFunction<Object> reduceFunction = new FirstValueReducer<>();
+				reducingState = context.getKeyedStateStore().getReducingState(new ReducingStateDescriptor<>("reducingState", reduceFunction, (Class<Object>) pojoClass));
+			}
+		}
+	}
+
+	private static final class FirstValueReducer<T> implements ReduceFunction<T> {
+
+		private static final long serialVersionUID = -9222976423336835926L;
+
+		@Override
+		public T reduce(T value1, T value2) throws Exception {
+			return value1;
+		}
+	}
+
+	private static final class IdentityKeySelector<T> implements KeySelector<T, T> {
+
+		private static final long serialVersionUID = -3263628393881929147L;
+
+		@Override
+		public T getKey(T value) throws Exception {
+			return value;
+		}
+	}
+}