You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2017/05/03 14:28:13 UTC

[07/13] flink git commit: [FLINK-5969] Add savepoint IT case that checks restore from 1.2

[FLINK-5969] Add savepoint IT case that checks restore from 1.2

The binary savepoints in this were created on the Flink 1.2.0 release
commit.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9ed98f2e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9ed98f2e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9ed98f2e

Branch: refs/heads/master
Commit: 9ed98f2e5a32fb14de03b9a8ea1dd45851cc3a7e
Parents: 1882c90
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Thu Apr 20 10:46:10 2017 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Wed May 3 16:24:26 2017 +0200

----------------------------------------------------------------------
 .../utils/SavepointMigrationTestBase.java       |  24 +-
 ...atefulJobSavepointFrom11MigrationITCase.java | 562 ++++++++++++++
 ...atefulJobSavepointFrom12MigrationITCase.java | 769 +++++++++++++++++++
 .../StatefulUDFSavepointMigrationITCase.java    | 562 --------------
 ...eful-udf-migration-itcase-flink1.2-savepoint | Bin 0 -> 25245 bytes
 ...-migration-itcase-flink1.2-savepoint-rocksdb | Bin 0 -> 25256 bytes
 6 files changed, 1332 insertions(+), 585 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/9ed98f2e/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java
index 301fc72..c5672a2 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java
@@ -155,15 +155,10 @@ public class SavepointMigrationTestBase extends TestBaseUtils {
 		}
 
 		LOG.info("Triggering savepoint.");
-		// Flink 1.2
+
 		final Future<Object> savepointResultFuture =
 				jobManager.ask(new JobManagerMessages.TriggerSavepoint(jobSubmissionResult.getJobID(), Option.<String>empty()), DEADLINE.timeLeft());
 
-		// Flink 1.1
-//		final Future<Object> savepointResultFuture =
-//				jobManager.ask(new JobManagerMessages.TriggerSavepoint(jobSubmissionResult.getJobID()), DEADLINE.timeLeft());
-
-
 		Object savepointResult = Await.result(savepointResultFuture, DEADLINE.timeLeft());
 
 		if (savepointResult instanceof JobManagerMessages.TriggerSavepointFailure) {
@@ -174,24 +169,7 @@ public class SavepointMigrationTestBase extends TestBaseUtils {
 		final String jobmanagerSavepointPath = ((JobManagerMessages.TriggerSavepointSuccess) savepointResult).savepointPath();
 		LOG.info("Saved savepoint: " + jobmanagerSavepointPath);
 
-		// Flink 1.2
 		FileUtils.moveFile(new File(new URI(jobmanagerSavepointPath).getPath()), new File(savepointPath));
-
-		// Flink 1.1
-		// Retrieve the savepoint from the testing job manager
-//		LOG.info("Requesting the savepoint.");
-//		Future<Object> savepointFuture = jobManager.ask(new TestingJobManagerMessages.RequestSavepoint(jobmanagerSavepointPath), DEADLINE.timeLeft());
-//
-//		Savepoint savepoint = ((TestingJobManagerMessages.ResponseSavepoint) Await.result(savepointFuture, DEADLINE.timeLeft())).savepoint();
-//		LOG.info("Retrieved savepoint: " + jobmanagerSavepointPath + ".");
-//
-//		LOG.info("Storing savepoint to file.");
-//		Configuration config = new Configuration();
-//		config.setString(org.apache.flink.runtime.checkpoint.savepoint.SavepointStoreFactory.SAVEPOINT_BACKEND_KEY, "filesystem");
-//		config.setString(org.apache.flink.runtime.checkpoint.savepoint.SavepointStoreFactory.SAVEPOINT_DIRECTORY_KEY, "file:///Users/aljoscha/Downloads");
-//		String path = org.apache.flink.runtime.checkpoint.savepoint.SavepointStoreFactory.createFromConfig(config).storeSavepoint(savepoint);
-//
-//		FileUtils.moveFile(new File(new URI(path).getPath()), new File(savepointPath));
 	}
 
 	@SafeVarargs

http://git-wip-us.apache.org/repos/asf/flink/blob/9ed98f2e/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobSavepointFrom11MigrationITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobSavepointFrom11MigrationITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobSavepointFrom11MigrationITCase.java
new file mode 100644
index 0000000..4d94d25
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobSavepointFrom11MigrationITCase.java
@@ -0,0 +1,562 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing.utils;
+
+import org.apache.flink.api.common.accumulators.IntCounter;
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.checkpoint.Checkpointed;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedRestoring;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.Collector;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * This verifies that we can restore a complete job from a Flink 1.1 savepoint.
+ *
+ * <p>The test pipeline contains both "Checkpointed" state and keyed user state.
+ */
+public class StatefulJobSavepointFrom11MigrationITCase extends SavepointMigrationTestBase {
+	private static final int NUM_SOURCE_ELEMENTS = 4;
+	private static final String EXPECTED_ELEMENTS_ACCUMULATOR = "NUM_EXPECTED_ELEMENTS";
+	private static final String SUCCESSFUL_CHECK_ACCUMULATOR = "SUCCESSFUL_CHECKS";
+
+	/**
+	 * This has to be manually executed to create the savepoint on Flink 1.1.
+	 */
+	@Test
+	@Ignore
+	public void testCreateSavepointOnFlink11() throws Exception {
+
+		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+		// we only test memory state backend yet
+		env.setStateBackend(new MemoryStateBackend());
+		env.enableCheckpointing(500);
+		env.setParallelism(4);
+		env.setMaxParallelism(4);
+
+		// create source
+		env
+				.addSource(new LegacyCheckpointedSource(NUM_SOURCE_ELEMENTS)).setMaxParallelism(1).uid("LegacyCheckpointedSource")
+				.flatMap(new LegacyCheckpointedFlatMap()).startNewChain().uid("LegacyCheckpointedFlatMap")
+				.keyBy(0)
+				.flatMap(new LegacyCheckpointedFlatMapWithKeyedState()).startNewChain().uid("LegacyCheckpointedFlatMapWithKeyedState")
+				.keyBy(0)
+				.flatMap(new KeyedStateSettingFlatMap()).startNewChain().uid("KeyedStateSettingFlatMap")
+				.keyBy(0)
+				.transform(
+						"custom_operator",
+						new TypeHint<Tuple2<Long, Long>>() {}.getTypeInfo(),
+						new CheckpointedUdfOperator(new LegacyCheckpointedFlatMapWithKeyedState())).uid("LegacyCheckpointedOperator")
+				.addSink(new AccumulatorCountingSink<Tuple2<Long, Long>>(EXPECTED_ELEMENTS_ACCUMULATOR));
+
+		executeAndSavepoint(
+				env,
+				"src/test/resources/stateful-udf-migration-itcase-flink1.1-savepoint",
+				new Tuple2<>(EXPECTED_ELEMENTS_ACCUMULATOR, NUM_SOURCE_ELEMENTS));
+	}
+
+	/**
+	 * This has to be manually executed to create the savepoint on Flink 1.1.
+	 */
+	@Test
+	@Ignore
+	public void testCreateSavepointOnFlink11WithRocksDB() throws Exception {
+
+		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+		RocksDBStateBackend rocksBackend =
+				new RocksDBStateBackend(new MemoryStateBackend());
+//		rocksBackend.enableFullyAsyncSnapshots();
+		env.setStateBackend(rocksBackend);
+		env.enableCheckpointing(500);
+		env.setParallelism(4);
+		env.setMaxParallelism(4);
+
+		// create source
+		env
+				.addSource(new LegacyCheckpointedSource(NUM_SOURCE_ELEMENTS)).setMaxParallelism(1).uid("LegacyCheckpointedSource")
+				.flatMap(new LegacyCheckpointedFlatMap()).startNewChain().uid("LegacyCheckpointedFlatMap")
+				.keyBy(0)
+				.flatMap(new LegacyCheckpointedFlatMapWithKeyedState()).startNewChain().uid("LegacyCheckpointedFlatMapWithKeyedState")
+				.keyBy(0)
+				.flatMap(new KeyedStateSettingFlatMap()).startNewChain().uid("KeyedStateSettingFlatMap")
+				.keyBy(0)
+				.transform(
+						"custom_operator",
+						new TypeHint<Tuple2<Long, Long>>() {}.getTypeInfo(),
+						new CheckpointedUdfOperator(new LegacyCheckpointedFlatMapWithKeyedState())).uid("LegacyCheckpointedOperator")
+				.addSink(new AccumulatorCountingSink<Tuple2<Long, Long>>(EXPECTED_ELEMENTS_ACCUMULATOR));
+
+		executeAndSavepoint(
+				env,
+				"src/test/resources/stateful-udf-migration-itcase-flink1.1-savepoint-rocksdb",
+				new Tuple2<>(EXPECTED_ELEMENTS_ACCUMULATOR, NUM_SOURCE_ELEMENTS));
+	}
+
+
+	@Test
+	public void testSavepointRestoreFromFlink11() throws Exception {
+
+		final int EXPECTED_SUCCESSFUL_CHECKS = 21;
+
+		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+		// we only test memory state backend yet
+		env.setStateBackend(new MemoryStateBackend());
+		env.enableCheckpointing(500);
+		env.setParallelism(4);
+		env.setMaxParallelism(4);
+
+		// create source
+		env
+				.addSource(new RestoringCheckingSource(NUM_SOURCE_ELEMENTS)).setMaxParallelism(1).uid("LegacyCheckpointedSource")
+				.flatMap(new RestoringCheckingFlatMap()).startNewChain().uid("LegacyCheckpointedFlatMap")
+				.keyBy(0)
+				.flatMap(new RestoringCheckingFlatMapWithKeyedState()).startNewChain().uid("LegacyCheckpointedFlatMapWithKeyedState")
+				.keyBy(0)
+				.flatMap(new KeyedStateCheckingFlatMap()).startNewChain().uid("KeyedStateSettingFlatMap")
+				.keyBy(0)
+				.transform(
+						"custom_operator",
+						new TypeHint<Tuple2<Long, Long>>() {}.getTypeInfo(),
+						new RestoringCheckingUdfOperator(new RestoringCheckingFlatMapWithKeyedState())).uid("LegacyCheckpointedOperator")
+				.addSink(new AccumulatorCountingSink<Tuple2<Long, Long>>(EXPECTED_ELEMENTS_ACCUMULATOR));
+
+		restoreAndExecute(
+				env,
+				getResourceFilename("stateful-udf-migration-itcase-flink1.1-savepoint"),
+				new Tuple2<>(SUCCESSFUL_CHECK_ACCUMULATOR, EXPECTED_SUCCESSFUL_CHECKS));
+	}
+
+	@Test
+	public void testSavepointRestoreFromFlink11FromRocksDB() throws Exception {
+
+		final int EXPECTED_SUCCESSFUL_CHECKS = 21;
+
+		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+		// we only test memory state backend yet
+		env.setStateBackend(new RocksDBStateBackend(new MemoryStateBackend()));
+		env.enableCheckpointing(500);
+		env.setParallelism(4);
+		env.setMaxParallelism(4);
+
+		// create source
+		env
+				.addSource(new RestoringCheckingSource(NUM_SOURCE_ELEMENTS)).setMaxParallelism(1).uid("LegacyCheckpointedSource")
+				.flatMap(new RestoringCheckingFlatMap()).startNewChain().uid("LegacyCheckpointedFlatMap")
+				.keyBy(0)
+				.flatMap(new RestoringCheckingFlatMapWithKeyedState()).startNewChain().uid("LegacyCheckpointedFlatMapWithKeyedState")
+				.keyBy(0)
+				.flatMap(new KeyedStateCheckingFlatMap()).startNewChain().uid("KeyedStateSettingFlatMap")
+				.keyBy(0)
+				.transform(
+						"custom_operator",
+						new TypeHint<Tuple2<Long, Long>>() {}.getTypeInfo(),
+						new RestoringCheckingUdfOperator(new RestoringCheckingFlatMapWithKeyedState())).uid("LegacyCheckpointedOperator")
+				.addSink(new AccumulatorCountingSink<Tuple2<Long, Long>>(EXPECTED_ELEMENTS_ACCUMULATOR));
+
+		restoreAndExecute(
+				env,
+				getResourceFilename("stateful-udf-migration-itcase-flink1.1-savepoint-rocksdb"),
+				new Tuple2<>(SUCCESSFUL_CHECK_ACCUMULATOR, EXPECTED_SUCCESSFUL_CHECKS));
+	}
+
+	private static class LegacyCheckpointedSource
+			implements SourceFunction<Tuple2<Long, Long>>, Checkpointed<String> {
+
+		public static String CHECKPOINTED_STRING = "Here be dragons!";
+
+		private static final long serialVersionUID = 1L;
+
+		private volatile boolean isRunning = true;
+
+		private final int numElements;
+
+		public LegacyCheckpointedSource(int numElements) {
+			this.numElements = numElements;
+		}
+
+		@Override
+		public void run(SourceContext<Tuple2<Long, Long>> ctx) throws Exception {
+
+			synchronized (ctx.getCheckpointLock()) {
+				for (long i = 0; i < numElements; i++) {
+					ctx.collect(new Tuple2<>(i, i));
+				}
+			}
+			while (isRunning) {
+				Thread.sleep(20);
+			}
+		}
+
+		@Override
+		public void cancel() {
+			isRunning = false;
+		}
+
+		@Override
+		public void restoreState(String state) throws Exception {
+			assertEquals(CHECKPOINTED_STRING, state);
+		}
+
+		@Override
+		public String snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
+			return CHECKPOINTED_STRING;
+		}
+	}
+
+	private static class RestoringCheckingSource
+			extends RichSourceFunction<Tuple2<Long, Long>>
+			implements CheckpointedRestoring<String> {
+
+		private static final long serialVersionUID = 1L;
+
+		private volatile boolean isRunning = true;
+
+		private final int numElements;
+
+		private String restoredState;
+
+		public RestoringCheckingSource(int numElements) {
+			this.numElements = numElements;
+		}
+
+		@Override
+		public void open(Configuration parameters) throws Exception {
+			super.open(parameters);
+
+			getRuntimeContext().addAccumulator(SUCCESSFUL_CHECK_ACCUMULATOR, new IntCounter());
+		}
+
+		@Override
+		public void run(SourceContext<Tuple2<Long, Long>> ctx) throws Exception {
+			assertEquals(LegacyCheckpointedSource.CHECKPOINTED_STRING, restoredState);
+			getRuntimeContext().getAccumulator(SUCCESSFUL_CHECK_ACCUMULATOR).add(1);
+
+			synchronized (ctx.getCheckpointLock()) {
+				for (long i = 0; i < numElements; i++) {
+					ctx.collect(new Tuple2<>(i, i));
+				}
+			}
+
+			while (isRunning) {
+				Thread.sleep(20);
+			}
+		}
+
+		@Override
+		public void cancel() {
+			isRunning = false;
+		}
+
+		@Override
+		public void restoreState(String state) throws Exception {
+			restoredState = state;
+		}
+	}
+
+	public static class LegacyCheckpointedFlatMap extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>
+			implements Checkpointed<Tuple2<String, Long>> {
+
+		private static final long serialVersionUID = 1L;
+
+		public static Tuple2<String, Long> CHECKPOINTED_TUPLE =
+				new Tuple2<>("hello", 42L);
+
+		@Override
+		public void flatMap(Tuple2<Long, Long> value, Collector<Tuple2<Long, Long>> out) throws Exception {
+			out.collect(value);
+		}
+
+		@Override
+		public void restoreState(Tuple2<String, Long> state) throws Exception {
+		}
+
+		@Override
+		public Tuple2<String, Long> snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
+			return CHECKPOINTED_TUPLE;
+		}
+	}
+
+	public static class RestoringCheckingFlatMap extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>
+			implements CheckpointedRestoring<Tuple2<String, Long>> {
+
+		private static final long serialVersionUID = 1L;
+
+		private transient Tuple2<String, Long> restoredState;
+
+		@Override
+		public void open(Configuration parameters) throws Exception {
+			super.open(parameters);
+
+			getRuntimeContext().addAccumulator(SUCCESSFUL_CHECK_ACCUMULATOR, new IntCounter());
+		}
+
+		@Override
+		public void flatMap(Tuple2<Long, Long> value, Collector<Tuple2<Long, Long>> out) throws Exception {
+			out.collect(value);
+
+			assertEquals(LegacyCheckpointedFlatMap.CHECKPOINTED_TUPLE, restoredState);
+			getRuntimeContext().getAccumulator(SUCCESSFUL_CHECK_ACCUMULATOR).add(1);
+
+		}
+
+		@Override
+		public void restoreState(Tuple2<String, Long> state) throws Exception {
+			restoredState = state;
+		}
+	}
+
+	public static class LegacyCheckpointedFlatMapWithKeyedState
+			extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>
+			implements Checkpointed<Tuple2<String, Long>> {
+
+		private static final long serialVersionUID = 1L;
+
+		public static Tuple2<String, Long> CHECKPOINTED_TUPLE =
+				new Tuple2<>("hello", 42L);
+
+		private final ValueStateDescriptor<Long> stateDescriptor =
+				new ValueStateDescriptor<Long>("state-name", LongSerializer.INSTANCE);
+
+		@Override
+		public void flatMap(Tuple2<Long, Long> value, Collector<Tuple2<Long, Long>> out) throws Exception {
+			out.collect(value);
+
+			getRuntimeContext().getState(stateDescriptor).update(value.f1);
+		}
+
+		@Override
+		public void restoreState(Tuple2<String, Long> state) throws Exception {
+		}
+
+		@Override
+		public Tuple2<String, Long> snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
+			return CHECKPOINTED_TUPLE;
+		}
+	}
+
+	public static class RestoringCheckingFlatMapWithKeyedState extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>
+			implements CheckpointedRestoring<Tuple2<String, Long>> {
+
+		private static final long serialVersionUID = 1L;
+
+		private transient Tuple2<String, Long> restoredState;
+
+		private final ValueStateDescriptor<Long> stateDescriptor =
+				new ValueStateDescriptor<Long>("state-name", LongSerializer.INSTANCE);
+
+		@Override
+		public void open(Configuration parameters) throws Exception {
+			super.open(parameters);
+
+			getRuntimeContext().addAccumulator(SUCCESSFUL_CHECK_ACCUMULATOR, new IntCounter());
+		}
+
+		@Override
+		public void flatMap(Tuple2<Long, Long> value, Collector<Tuple2<Long, Long>> out) throws Exception {
+			out.collect(value);
+
+			ValueState<Long> state = getRuntimeContext().getState(stateDescriptor);
+			if (state == null) {
+				throw new RuntimeException("Missing key value state for " + value);
+			}
+
+			assertEquals(value.f1, state.value());
+			assertEquals(LegacyCheckpointedFlatMap.CHECKPOINTED_TUPLE, restoredState);
+			getRuntimeContext().getAccumulator(SUCCESSFUL_CHECK_ACCUMULATOR).add(1);
+		}
+
+		@Override
+		public void restoreState(Tuple2<String, Long> state) throws Exception {
+			restoredState = state;
+		}
+	}
+
+	public static class KeyedStateSettingFlatMap extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {
+
+		private static final long serialVersionUID = 1L;
+
+		private final ValueStateDescriptor<Long> stateDescriptor =
+				new ValueStateDescriptor<Long>("state-name", LongSerializer.INSTANCE);
+
+		@Override
+		public void flatMap(Tuple2<Long, Long> value, Collector<Tuple2<Long, Long>> out) throws Exception {
+			out.collect(value);
+
+			getRuntimeContext().getState(stateDescriptor).update(value.f1);
+		}
+	}
+
+	public static class KeyedStateCheckingFlatMap extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {
+
+		private static final long serialVersionUID = 1L;
+
+		private final ValueStateDescriptor<Long> stateDescriptor =
+				new ValueStateDescriptor<Long>("state-name", LongSerializer.INSTANCE);
+
+		@Override
+		public void open(Configuration parameters) throws Exception {
+			super.open(parameters);
+
+			getRuntimeContext().addAccumulator(SUCCESSFUL_CHECK_ACCUMULATOR, new IntCounter());
+		}
+
+		@Override
+		public void flatMap(Tuple2<Long, Long> value, Collector<Tuple2<Long, Long>> out) throws Exception {
+			out.collect(value);
+
+			ValueState<Long> state = getRuntimeContext().getState(stateDescriptor);
+			if (state == null) {
+				throw new RuntimeException("Missing key value state for " + value);
+			}
+
+			assertEquals(value.f1, state.value());
+			getRuntimeContext().getAccumulator(SUCCESSFUL_CHECK_ACCUMULATOR).add(1);
+		}
+	}
+
+	public static class CheckpointedUdfOperator
+			extends AbstractUdfStreamOperator<Tuple2<Long, Long>, FlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>>
+			implements OneInputStreamOperator<Tuple2<Long, Long>, Tuple2<Long, Long>> {
+		private static final long serialVersionUID = 1L;
+
+		private static final String CHECKPOINTED_STRING = "Oh my, that's nice!";
+
+		public CheckpointedUdfOperator(FlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> userFunction) {
+			super(userFunction);
+		}
+
+		@Override
+		public void processElement(StreamRecord<Tuple2<Long, Long>> element) throws Exception {
+			output.collect(element);
+		}
+
+		@Override
+		public void processWatermark(Watermark mark) throws Exception {
+			output.emitWatermark(mark);
+		}
+
+		// Flink 1.1
+//		@Override
+//		public StreamTaskState snapshotOperatorState(
+//				long checkpointId, long timestamp) throws Exception {
+//			StreamTaskState result = super.snapshotOperatorState(checkpointId, timestamp);
+//
+//			AbstractStateBackend.CheckpointStateOutputView out = getStateBackend().createCheckpointStateOutputView(
+//					checkpointId,
+//					timestamp);
+//
+//			out.writeUTF(CHECKPOINTED_STRING);
+//
+//			result.setOperatorState(out.closeAndGetHandle());
+//
+//			return result;
+//		}
+	}
+
+	public static class RestoringCheckingUdfOperator
+			extends AbstractUdfStreamOperator<Tuple2<Long, Long>, FlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>>
+			implements OneInputStreamOperator<Tuple2<Long, Long>, Tuple2<Long, Long>> {
+		private static final long serialVersionUID = 1L;
+
+		private String restoredState;
+
+		public RestoringCheckingUdfOperator(FlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> userFunction) {
+			super(userFunction);
+		}
+
+		@Override
+		public void open() throws Exception {
+			super.open();
+		}
+
+		@Override
+		public void processElement(StreamRecord<Tuple2<Long, Long>> element) throws Exception {
+			userFunction.flatMap(element.getValue(), new TimestampedCollector<>(output));
+
+			assertEquals(CheckpointedUdfOperator.CHECKPOINTED_STRING, restoredState);
+			getRuntimeContext().getAccumulator(SUCCESSFUL_CHECK_ACCUMULATOR).add(1);
+		}
+
+		@Override
+		public void processWatermark(Watermark mark) throws Exception {
+			output.emitWatermark(mark);
+		}
+
+		@Override
+		public void restoreState(FSDataInputStream in) throws Exception {
+			super.restoreState(in);
+
+			DataInputViewStreamWrapper streamWrapper = new DataInputViewStreamWrapper(in);
+
+			restoredState = streamWrapper.readUTF();
+		}
+	}
+
+	public static class AccumulatorCountingSink<T> extends RichSinkFunction<T> {
+		private static final long serialVersionUID = 1L;
+
+		private final String accumulatorName;
+
+		int count = 0;
+
+		public AccumulatorCountingSink(String accumulatorName) {
+			this.accumulatorName = accumulatorName;
+		}
+
+		@Override
+		public void open(Configuration parameters) throws Exception {
+			super.open(parameters);
+
+			getRuntimeContext().addAccumulator(accumulatorName, new IntCounter());
+		}
+
+		@Override
+		public void invoke(T value) throws Exception {
+			count++;
+			getRuntimeContext().getAccumulator(accumulatorName).add(1);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9ed98f2e/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobSavepointFrom12MigrationITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobSavepointFrom12MigrationITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobSavepointFrom12MigrationITCase.java
new file mode 100644
index 0000000..e60cb5d
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobSavepointFrom12MigrationITCase.java
@@ -0,0 +1,769 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing.utils;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.flink.api.common.accumulators.IntCounter;
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.checkpoint.Checkpointed;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedRestoring;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.operators.Triggerable;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.Collector;
+import org.junit.Ignore;
+import org.junit.Test;
+
+/**
+ * This verifies that we can restore a complete job from a Flink 1.2 savepoint.
+ *
+ * <p>The test pipeline contains both "Checkpointed" state and keyed user state.
+ *
+ * <p>The tests will time out if they don't see the required number of successful checks within
+ * a time limit.
+ */
+public class StatefulJobSavepointFrom12MigrationITCase extends SavepointMigrationTestBase {
+	private static final int NUM_SOURCE_ELEMENTS = 4;
+
+	/**
+	 * This has to be manually executed to create the savepoint on Flink 1.2.
+	 */
+	@Test
+	@Ignore
+	public void testCreateSavepointOnFlink12() throws Exception {
+
+		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+		// we only test memory state backend yet
+		env.setStateBackend(new MemoryStateBackend());
+		env.enableCheckpointing(500);
+		env.setParallelism(4);
+		env.setMaxParallelism(4);
+
+		env
+				.addSource(new LegacyCheckpointedSource(NUM_SOURCE_ELEMENTS)).setMaxParallelism(1).uid("LegacyCheckpointedSource")
+				.flatMap(new LegacyCheckpointedFlatMap()).startNewChain().uid("LegacyCheckpointedFlatMap")
+				.keyBy(0)
+				.flatMap(new LegacyCheckpointedFlatMapWithKeyedState()).startNewChain().uid("LegacyCheckpointedFlatMapWithKeyedState")
+				.keyBy(0)
+				.flatMap(new KeyedStateSettingFlatMap()).startNewChain().uid("KeyedStateSettingFlatMap")
+				.keyBy(0)
+				.transform(
+						"custom_operator",
+						new TypeHint<Tuple2<Long, Long>>() {}.getTypeInfo(),
+						new CheckpointedUdfOperator(new LegacyCheckpointedFlatMapWithKeyedState())).uid("LegacyCheckpointedOperator")
+				.keyBy(0)
+				.transform(
+						"timely_stateful_operator",
+						new TypeHint<Tuple2<Long, Long>>() {}.getTypeInfo(),
+						new TimelyStatefulOperator()).uid("TimelyStatefulOperator")
+				.addSink(new AccumulatorCountingSink<Tuple2<Long, Long>>());
+
+		executeAndSavepoint(
+				env,
+				"src/test/resources/stateful-udf-migration-itcase-flink1.2-savepoint",
+				new Tuple2<>(AccumulatorCountingSink.NUM_ELEMENTS_ACCUMULATOR, NUM_SOURCE_ELEMENTS));
+	}
+
+	/**
+	 * This has to be manually executed to create the savepoint on Flink 1.2.
+	 */
+	@Test
+	@Ignore
+	public void testCreateSavepointOnFlink12WithRocksDB() throws Exception {
+
+		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+		RocksDBStateBackend rocksBackend =
+				new RocksDBStateBackend(new MemoryStateBackend());
+		env.setStateBackend(rocksBackend);
+		env.enableCheckpointing(500);
+		env.setParallelism(4);
+		env.setMaxParallelism(4);
+
+		env
+				.addSource(new LegacyCheckpointedSource(NUM_SOURCE_ELEMENTS)).setMaxParallelism(1).uid("LegacyCheckpointedSource")
+				.flatMap(new LegacyCheckpointedFlatMap()).startNewChain().uid("LegacyCheckpointedFlatMap")
+				.keyBy(0)
+				.flatMap(new LegacyCheckpointedFlatMapWithKeyedState()).startNewChain().uid("LegacyCheckpointedFlatMapWithKeyedState")
+				.keyBy(0)
+				.flatMap(new KeyedStateSettingFlatMap()).startNewChain().uid("KeyedStateSettingFlatMap")
+				.keyBy(0)
+				.transform(
+						"custom_operator",
+						new TypeHint<Tuple2<Long, Long>>() {}.getTypeInfo(),
+						new CheckpointedUdfOperator(new LegacyCheckpointedFlatMapWithKeyedState())).uid("LegacyCheckpointedOperator")
+				.keyBy(0)
+				.transform(
+						"timely_stateful_operator",
+						new TypeHint<Tuple2<Long, Long>>() {}.getTypeInfo(),
+						new TimelyStatefulOperator()).uid("TimelyStatefulOperator")
+				.addSink(new AccumulatorCountingSink<Tuple2<Long, Long>>());
+
+		executeAndSavepoint(
+				env,
+				"src/test/resources/stateful-udf-migration-itcase-flink1.2-savepoint-rocksdb",
+				new Tuple2<>(AccumulatorCountingSink.NUM_ELEMENTS_ACCUMULATOR, NUM_SOURCE_ELEMENTS));
+	}
+
+
+	@Test
+	public void testSavepointRestoreFromFlink12() throws Exception {
+
+		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setRestartStrategy(RestartStrategies.noRestart());
+		env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+		// we only test memory state backend yet
+		env.setStateBackend(new MemoryStateBackend());
+		env.enableCheckpointing(500);
+		env.setParallelism(4);
+		env.setMaxParallelism(4);
+
+		env
+				.addSource(new CheckingRestoringSource(NUM_SOURCE_ELEMENTS)).setMaxParallelism(1).uid("LegacyCheckpointedSource")
+				.flatMap(new CheckingRestoringFlatMap()).startNewChain().uid("LegacyCheckpointedFlatMap")
+				.keyBy(0)
+				.flatMap(new CheckingRestoringFlatMapWithKeyedState()).startNewChain().uid("LegacyCheckpointedFlatMapWithKeyedState")
+				.keyBy(0)
+				.flatMap(new CheckingKeyedStateFlatMap()).startNewChain().uid("KeyedStateSettingFlatMap")
+				.keyBy(0)
+				.transform(
+						"custom_operator",
+						new TypeHint<Tuple2<Long, Long>>() {}.getTypeInfo(),
+						new CheckingRestoringUdfOperator(new CheckingRestoringFlatMapWithKeyedStateInOperator())).uid("LegacyCheckpointedOperator")
+				.keyBy(0)
+				.transform(
+						"timely_stateful_operator",
+						new TypeHint<Tuple2<Long, Long>>() {}.getTypeInfo(),
+						new CheckingTimelyStatefulOperator()).uid("TimelyStatefulOperator")
+				.addSink(new AccumulatorCountingSink<Tuple2<Long, Long>>());
+
+		restoreAndExecute(
+				env,
+				getResourceFilename("stateful-udf-migration-itcase-flink1.2-savepoint"),
+				new Tuple2<>(CheckingRestoringSource.SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR, 1),
+				new Tuple2<>(CheckingRestoringFlatMap.SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR, NUM_SOURCE_ELEMENTS),
+				new Tuple2<>(CheckingRestoringFlatMapWithKeyedState.SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR, NUM_SOURCE_ELEMENTS),
+				new Tuple2<>(CheckingKeyedStateFlatMap.SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR, NUM_SOURCE_ELEMENTS),
+				new Tuple2<>(CheckingRestoringUdfOperator.SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR, NUM_SOURCE_ELEMENTS),
+				new Tuple2<>(CheckingRestoringFlatMapWithKeyedStateInOperator.SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR, NUM_SOURCE_ELEMENTS),
+				new Tuple2<>(CheckingTimelyStatefulOperator.SUCCESSFUL_PROCESS_CHECK_ACCUMULATOR, NUM_SOURCE_ELEMENTS),
+				new Tuple2<>(CheckingTimelyStatefulOperator.SUCCESSFUL_EVENT_TIME_CHECK_ACCUMULATOR, NUM_SOURCE_ELEMENTS),
+				new Tuple2<>(CheckingTimelyStatefulOperator.SUCCESSFUL_PROCESSING_TIME_CHECK_ACCUMULATOR, NUM_SOURCE_ELEMENTS),
+				new Tuple2<>(AccumulatorCountingSink.NUM_ELEMENTS_ACCUMULATOR, NUM_SOURCE_ELEMENTS));
+	}
+
+	@Test
+	public void testSavepointRestoreFromFlink12FromRocksDB() throws Exception {
+
+		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setRestartStrategy(RestartStrategies.noRestart());
+		env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+		// we only test memory state backend yet
+		env.setStateBackend(new RocksDBStateBackend(new MemoryStateBackend()));
+		env.enableCheckpointing(500);
+		env.setParallelism(4);
+		env.setMaxParallelism(4);
+
+		env
+				.addSource(new CheckingRestoringSource(NUM_SOURCE_ELEMENTS)).setMaxParallelism(1).uid("LegacyCheckpointedSource")
+				.flatMap(new CheckingRestoringFlatMap()).startNewChain().uid("LegacyCheckpointedFlatMap")
+				.keyBy(0)
+				.flatMap(new CheckingRestoringFlatMapWithKeyedState()).startNewChain().uid("LegacyCheckpointedFlatMapWithKeyedState")
+				.keyBy(0)
+				.flatMap(new CheckingKeyedStateFlatMap()).startNewChain().uid("KeyedStateSettingFlatMap")
+				.keyBy(0)
+				.transform(
+						"custom_operator",
+						new TypeHint<Tuple2<Long, Long>>() {}.getTypeInfo(),
+						new CheckingRestoringUdfOperator(new CheckingRestoringFlatMapWithKeyedStateInOperator())).uid("LegacyCheckpointedOperator")
+				.keyBy(0)
+				.transform(
+						"timely_stateful_operator",
+						new TypeHint<Tuple2<Long, Long>>() {}.getTypeInfo(),
+						new CheckingTimelyStatefulOperator()).uid("TimelyStatefulOperator")
+				.addSink(new AccumulatorCountingSink<Tuple2<Long, Long>>());
+
+		restoreAndExecute(
+				env,
+				getResourceFilename("stateful-udf-migration-itcase-flink1.2-savepoint-rocksdb"),
+				new Tuple2<>(CheckingRestoringSource.SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR, 1),
+				new Tuple2<>(CheckingRestoringFlatMap.SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR, NUM_SOURCE_ELEMENTS),
+				new Tuple2<>(CheckingRestoringFlatMapWithKeyedState.SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR, NUM_SOURCE_ELEMENTS),
+				new Tuple2<>(CheckingKeyedStateFlatMap.SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR, NUM_SOURCE_ELEMENTS),
+				new Tuple2<>(CheckingRestoringUdfOperator.SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR, NUM_SOURCE_ELEMENTS),
+				new Tuple2<>(CheckingRestoringFlatMapWithKeyedStateInOperator.SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR, NUM_SOURCE_ELEMENTS),
+				new Tuple2<>(CheckingTimelyStatefulOperator.SUCCESSFUL_PROCESS_CHECK_ACCUMULATOR, NUM_SOURCE_ELEMENTS),
+				new Tuple2<>(CheckingTimelyStatefulOperator.SUCCESSFUL_EVENT_TIME_CHECK_ACCUMULATOR, NUM_SOURCE_ELEMENTS),
+				new Tuple2<>(CheckingTimelyStatefulOperator.SUCCESSFUL_PROCESSING_TIME_CHECK_ACCUMULATOR, NUM_SOURCE_ELEMENTS),
+				new Tuple2<>(AccumulatorCountingSink.NUM_ELEMENTS_ACCUMULATOR, NUM_SOURCE_ELEMENTS));
+	}
+
+	private static class LegacyCheckpointedSource
+			implements SourceFunction<Tuple2<Long, Long>>, Checkpointed<String> {
+
+		public static String CHECKPOINTED_STRING = "Here be dragons!";
+
+		private static final long serialVersionUID = 1L;
+
+		private volatile boolean isRunning = true;
+
+		private final int numElements;
+
+		public LegacyCheckpointedSource(int numElements) {
+			this.numElements = numElements;
+		}
+
+		@Override
+		public void run(SourceContext<Tuple2<Long, Long>> ctx) throws Exception {
+
+			ctx.emitWatermark(new Watermark(0));
+
+			synchronized (ctx.getCheckpointLock()) {
+				for (long i = 0; i < numElements; i++) {
+					ctx.collect(new Tuple2<>(i, i));
+				}
+			}
+
+			// don't emit a final watermark so that we don't trigger the registered event-time
+			// timers
+			while (isRunning) {
+				Thread.sleep(20);
+			}
+		}
+
+		@Override
+		public void cancel() {
+			isRunning = false;
+		}
+
+		@Override
+		public void restoreState(String state) throws Exception {
+			assertEquals(CHECKPOINTED_STRING, state);
+		}
+
+		@Override
+		public String snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
+			return CHECKPOINTED_STRING;
+		}
+	}
+
+	private static class CheckingRestoringSource
+			extends RichSourceFunction<Tuple2<Long, Long>>
+			implements CheckpointedRestoring<String> {
+
+		private static final long serialVersionUID = 1L;
+
+		public static final String SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR = CheckingRestoringSource.class + "_RESTORE_CHECK";
+
+		private volatile boolean isRunning = true;
+
+		private final int numElements;
+
+		private String restoredState;
+
+		public CheckingRestoringSource(int numElements) {
+			this.numElements = numElements;
+		}
+
+		@Override
+		public void open(Configuration parameters) throws Exception {
+			super.open(parameters);
+
+			getRuntimeContext().addAccumulator(SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR, new IntCounter());
+		}
+
+		@Override
+		public void run(SourceContext<Tuple2<Long, Long>> ctx) throws Exception {
+			assertEquals(LegacyCheckpointedSource.CHECKPOINTED_STRING, restoredState);
+			getRuntimeContext().getAccumulator(SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR).add(1);
+
+			// immediately trigger any set timers
+			ctx.emitWatermark(new Watermark(1000));
+
+			synchronized (ctx.getCheckpointLock()) {
+				for (long i = 0; i < numElements; i++) {
+					ctx.collect(new Tuple2<>(i, i));
+				}
+			}
+
+			while (isRunning) {
+				Thread.sleep(20);
+			}
+		}
+
+		@Override
+		public void cancel() {
+			isRunning = false;
+		}
+
+		@Override
+		public void restoreState(String state) throws Exception {
+			restoredState = state;
+		}
+	}
+
+	public static class LegacyCheckpointedFlatMap extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>
+			implements Checkpointed<Tuple2<String, Long>> {
+
+		private static final long serialVersionUID = 1L;
+
+		public static Tuple2<String, Long> CHECKPOINTED_TUPLE =
+				new Tuple2<>("hello", 42L);
+
+		@Override
+		public void flatMap(Tuple2<Long, Long> value, Collector<Tuple2<Long, Long>> out) throws Exception {
+			out.collect(value);
+		}
+
+		@Override
+		public void restoreState(Tuple2<String, Long> state) throws Exception {
+		}
+
+		@Override
+		public Tuple2<String, Long> snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
+			return CHECKPOINTED_TUPLE;
+		}
+	}
+
+	public static class CheckingRestoringFlatMap extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>
+			implements CheckpointedRestoring<Tuple2<String, Long>> {
+
+		private static final long serialVersionUID = 1L;
+
+		public static final String SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR = CheckingRestoringFlatMap.class + "_RESTORE_CHECK";
+
+		private transient Tuple2<String, Long> restoredState;
+
+		@Override
+		public void open(Configuration parameters) throws Exception {
+			super.open(parameters);
+
+			getRuntimeContext().addAccumulator(SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR, new IntCounter());
+		}
+
+		@Override
+		public void flatMap(Tuple2<Long, Long> value, Collector<Tuple2<Long, Long>> out) throws Exception {
+			out.collect(value);
+
+			assertEquals(LegacyCheckpointedFlatMap.CHECKPOINTED_TUPLE, restoredState);
+			getRuntimeContext().getAccumulator(SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR).add(1);
+
+		}
+
+		@Override
+		public void restoreState(Tuple2<String, Long> state) throws Exception {
+			restoredState = state;
+		}
+	}
+
+	public static class LegacyCheckpointedFlatMapWithKeyedState
+			extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>
+			implements Checkpointed<Tuple2<String, Long>> {
+
+		private static final long serialVersionUID = 1L;
+
+		public static Tuple2<String, Long> CHECKPOINTED_TUPLE =
+				new Tuple2<>("hello", 42L);
+
+		private final ValueStateDescriptor<Long> stateDescriptor =
+				new ValueStateDescriptor<Long>("state-name", LongSerializer.INSTANCE);
+
+		@Override
+		public void flatMap(Tuple2<Long, Long> value, Collector<Tuple2<Long, Long>> out) throws Exception {
+			out.collect(value);
+
+			getRuntimeContext().getState(stateDescriptor).update(value.f1);
+
+			assertEquals(value.f1, getRuntimeContext().getState(stateDescriptor).value());
+		}
+
+		@Override
+		public void restoreState(Tuple2<String, Long> state) throws Exception {
+		}
+
+		@Override
+		public Tuple2<String, Long> snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
+			return CHECKPOINTED_TUPLE;
+		}
+	}
+
+	public static class CheckingRestoringFlatMapWithKeyedState extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>
+			implements CheckpointedRestoring<Tuple2<String, Long>> {
+
+		private static final long serialVersionUID = 1L;
+
+		public static final String SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR = CheckingRestoringFlatMapWithKeyedState.class + "_RESTORE_CHECK";
+
+		private transient Tuple2<String, Long> restoredState;
+
+		private final ValueStateDescriptor<Long> stateDescriptor =
+				new ValueStateDescriptor<Long>("state-name", LongSerializer.INSTANCE);
+
+		@Override
+		public void open(Configuration parameters) throws Exception {
+			super.open(parameters);
+
+			getRuntimeContext().addAccumulator(SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR, new IntCounter());
+		}
+
+		@Override
+		public void flatMap(Tuple2<Long, Long> value, Collector<Tuple2<Long, Long>> out) throws Exception {
+			out.collect(value);
+
+			ValueState<Long> state = getRuntimeContext().getState(stateDescriptor);
+			if (state == null) {
+				throw new RuntimeException("Missing key value state for " + value);
+			}
+
+			assertEquals(value.f1, state.value());
+			assertEquals(LegacyCheckpointedFlatMap.CHECKPOINTED_TUPLE, restoredState);
+			getRuntimeContext().getAccumulator(SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR).add(1);
+		}
+
+		@Override
+		public void restoreState(Tuple2<String, Long> state) throws Exception {
+			restoredState = state;
+		}
+	}
+
+	public static class CheckingRestoringFlatMapWithKeyedStateInOperator extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>
+			implements CheckpointedRestoring<Tuple2<String, Long>> {
+
+		private static final long serialVersionUID = 1L;
+
+		public static final String SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR = CheckingRestoringFlatMapWithKeyedStateInOperator.class + "_RESTORE_CHECK";
+
+		private transient Tuple2<String, Long> restoredState;
+
+		private final ValueStateDescriptor<Long> stateDescriptor =
+				new ValueStateDescriptor<Long>("state-name", LongSerializer.INSTANCE);
+
+		@Override
+		public void open(Configuration parameters) throws Exception {
+			super.open(parameters);
+
+			getRuntimeContext().addAccumulator(SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR, new IntCounter());
+		}
+
+		@Override
+		public void flatMap(Tuple2<Long, Long> value, Collector<Tuple2<Long, Long>> out) throws Exception {
+			out.collect(value);
+
+			ValueState<Long> state = getRuntimeContext().getState(stateDescriptor);
+			if (state == null) {
+				throw new RuntimeException("Missing key value state for " + value);
+			}
+
+			assertEquals(value.f1, state.value());
+			assertEquals(LegacyCheckpointedFlatMap.CHECKPOINTED_TUPLE, restoredState);
+			getRuntimeContext().getAccumulator(SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR).add(1);
+		}
+
+		@Override
+		public void restoreState(Tuple2<String, Long> state) throws Exception {
+			restoredState = state;
+		}
+	}
+
+	public static class KeyedStateSettingFlatMap extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {
+
+		private static final long serialVersionUID = 1L;
+
+		private final ValueStateDescriptor<Long> stateDescriptor =
+				new ValueStateDescriptor<Long>("state-name", LongSerializer.INSTANCE);
+
+		@Override
+		public void flatMap(Tuple2<Long, Long> value, Collector<Tuple2<Long, Long>> out) throws Exception {
+			out.collect(value);
+
+			getRuntimeContext().getState(stateDescriptor).update(value.f1);
+		}
+	}
+
+	public static class CheckingKeyedStateFlatMap extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {
+
+		private static final long serialVersionUID = 1L;
+
+		public static final String SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR = CheckingKeyedStateFlatMap.class + "_RESTORE_CHECK";
+
+		private final ValueStateDescriptor<Long> stateDescriptor =
+				new ValueStateDescriptor<Long>("state-name", LongSerializer.INSTANCE);
+
+		@Override
+		public void open(Configuration parameters) throws Exception {
+			super.open(parameters);
+
+			getRuntimeContext().addAccumulator(SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR, new IntCounter());
+		}
+
+		@Override
+		public void flatMap(Tuple2<Long, Long> value, Collector<Tuple2<Long, Long>> out) throws Exception {
+			out.collect(value);
+
+			ValueState<Long> state = getRuntimeContext().getState(stateDescriptor);
+			if (state == null) {
+				throw new RuntimeException("Missing key value state for " + value);
+			}
+
+			assertEquals(value.f1, state.value());
+			getRuntimeContext().getAccumulator(SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR).add(1);
+		}
+	}
+
+	public static class CheckpointedUdfOperator
+			extends AbstractUdfStreamOperator<Tuple2<Long, Long>, FlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>>
+			implements OneInputStreamOperator<Tuple2<Long, Long>, Tuple2<Long, Long>> {
+		private static final long serialVersionUID = 1L;
+
+		private static final String CHECKPOINTED_STRING = "Oh my, that's nice!";
+
+		public CheckpointedUdfOperator(FlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> userFunction) {
+			super(userFunction);
+		}
+
+		@Override
+		public void processElement(StreamRecord<Tuple2<Long, Long>> element) throws Exception {
+			userFunction.flatMap(element.getValue(), new TimestampedCollector<>(output));
+		}
+
+		@Override
+		public void processWatermark(Watermark mark) throws Exception {
+			output.emitWatermark(mark);
+		}
+
+		@Override
+		public void snapshotState(
+				FSDataOutputStream out, long checkpointId, long timestamp) throws Exception {
+			super.snapshotState(out, checkpointId, timestamp);
+
+			DataOutputViewStreamWrapper streamWrapper = new DataOutputViewStreamWrapper(out);
+
+			streamWrapper.writeUTF(CHECKPOINTED_STRING);
+			streamWrapper.flush();
+		}
+	}
+
+	public static class CheckingRestoringUdfOperator
+			extends AbstractUdfStreamOperator<Tuple2<Long, Long>, FlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>>
+			implements OneInputStreamOperator<Tuple2<Long, Long>, Tuple2<Long, Long>> {
+
+		private static final long serialVersionUID = 1L;
+
+		public static final String SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR = CheckingRestoringUdfOperator.class + "_RESTORE_CHECK";
+
+		private String restoredState;
+
+		public CheckingRestoringUdfOperator(FlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> userFunction) {
+			super(userFunction);
+		}
+
+		@Override
+		public void open() throws Exception {
+			super.open();
+
+			getRuntimeContext().addAccumulator(SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR, new IntCounter());
+		}
+
+		@Override
+		public void processElement(StreamRecord<Tuple2<Long, Long>> element) throws Exception {
+			userFunction.flatMap(element.getValue(), new TimestampedCollector<>(output));
+
+			assertEquals(CheckpointedUdfOperator.CHECKPOINTED_STRING, restoredState);
+			getRuntimeContext().getAccumulator(SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR).add(1);
+		}
+
+		@Override
+		public void processWatermark(Watermark mark) throws Exception {
+			output.emitWatermark(mark);
+		}
+
+		@Override
+		public void restoreState(FSDataInputStream in) throws Exception {
+			super.restoreState(in);
+
+			DataInputViewStreamWrapper streamWrapper = new DataInputViewStreamWrapper(in);
+
+			restoredState = streamWrapper.readUTF();
+		}
+	}
+
+	public static class TimelyStatefulOperator
+			extends AbstractStreamOperator<Tuple2<Long, Long>>
+			implements OneInputStreamOperator<Tuple2<Long, Long>, Tuple2<Long, Long>>, Triggerable<Long, Long> {
+		private static final long serialVersionUID = 1L;
+
+		private final ValueStateDescriptor<Long> stateDescriptor =
+				new ValueStateDescriptor<Long>("state-name", LongSerializer.INSTANCE);
+
+		private transient InternalTimerService<Long> timerService;
+
+		@Override
+		public void open() throws Exception {
+			super.open();
+
+			timerService = getInternalTimerService(
+					"timer",
+					LongSerializer.INSTANCE,
+					this);
+
+		}
+
+		@Override
+		public void processElement(StreamRecord<Tuple2<Long, Long>> element) throws Exception {
+			ValueState<Long> state = getKeyedStateBackend().getPartitionedState(
+					element.getValue().f0,
+					LongSerializer.INSTANCE,
+					stateDescriptor);
+
+			state.update(element.getValue().f1);
+
+			timerService.registerEventTimeTimer(element.getValue().f0, timerService.currentWatermark() + 10);
+			timerService.registerProcessingTimeTimer(element.getValue().f0, timerService.currentProcessingTime() + 30_000);
+
+			output.collect(element);
+		}
+
+		@Override
+		public void onEventTime(InternalTimer<Long, Long> timer) throws Exception {
+
+		}
+
+		@Override
+		public void onProcessingTime(InternalTimer<Long, Long> timer) throws Exception {
+
+		}
+
+		@Override
+		public void processWatermark(Watermark mark) throws Exception {
+			output.emitWatermark(mark);
+		}
+	}
+
+	public static class CheckingTimelyStatefulOperator
+			extends AbstractStreamOperator<Tuple2<Long, Long>>
+			implements OneInputStreamOperator<Tuple2<Long, Long>, Tuple2<Long, Long>>, Triggerable<Long, Long> {
+		private static final long serialVersionUID = 1L;
+
+		public static final String SUCCESSFUL_PROCESS_CHECK_ACCUMULATOR = CheckingTimelyStatefulOperator.class + "_PROCESS_CHECKS";
+		public static final String SUCCESSFUL_EVENT_TIME_CHECK_ACCUMULATOR = CheckingTimelyStatefulOperator.class + "_ET_CHECKS";
+		public static final String SUCCESSFUL_PROCESSING_TIME_CHECK_ACCUMULATOR = CheckingTimelyStatefulOperator.class + "_PT_CHECKS";
+
+		private final ValueStateDescriptor<Long> stateDescriptor =
+				new ValueStateDescriptor<Long>("state-name", LongSerializer.INSTANCE);
+
+		private transient InternalTimerService<Long> timerService;
+
+		@Override
+		public void open() throws Exception {
+			super.open();
+
+			timerService = getInternalTimerService(
+					"timer",
+					LongSerializer.INSTANCE,
+					this);
+
+			getRuntimeContext().addAccumulator(SUCCESSFUL_PROCESS_CHECK_ACCUMULATOR, new IntCounter());
+			getRuntimeContext().addAccumulator(SUCCESSFUL_EVENT_TIME_CHECK_ACCUMULATOR, new IntCounter());
+			getRuntimeContext().addAccumulator(SUCCESSFUL_PROCESSING_TIME_CHECK_ACCUMULATOR, new IntCounter());
+		}
+
+		@Override
+		public void processElement(StreamRecord<Tuple2<Long, Long>> element) throws Exception {
+			ValueState<Long> state = getKeyedStateBackend().getPartitionedState(
+					element.getValue().f0,
+					LongSerializer.INSTANCE,
+					stateDescriptor);
+
+			assertEquals(state.value(), element.getValue().f1);
+			getRuntimeContext().getAccumulator(SUCCESSFUL_PROCESS_CHECK_ACCUMULATOR).add(1);
+
+			output.collect(element);
+		}
+
+		@Override
+		public void onEventTime(InternalTimer<Long, Long> timer) throws Exception {
+			ValueState<Long> state = getKeyedStateBackend().getPartitionedState(
+					timer.getNamespace(),
+					LongSerializer.INSTANCE,
+					stateDescriptor);
+
+			assertEquals(state.value(), timer.getNamespace());
+			getRuntimeContext().getAccumulator(SUCCESSFUL_EVENT_TIME_CHECK_ACCUMULATOR).add(1);
+		}
+
+		@Override
+		public void onProcessingTime(InternalTimer<Long, Long> timer) throws Exception {
+			ValueState<Long> state = getKeyedStateBackend().getPartitionedState(
+					timer.getNamespace(),
+					LongSerializer.INSTANCE,
+					stateDescriptor);
+
+			assertEquals(state.value(), timer.getNamespace());
+			getRuntimeContext().getAccumulator(SUCCESSFUL_PROCESSING_TIME_CHECK_ACCUMULATOR).add(1);
+		}
+	}
+
+	public static class AccumulatorCountingSink<T> extends RichSinkFunction<T> {
+		private static final long serialVersionUID = 1L;
+
+		public static final String NUM_ELEMENTS_ACCUMULATOR = AccumulatorCountingSink.class + "_NUM_ELEMENTS";
+
+		int count = 0;
+
+		@Override
+		public void open(Configuration parameters) throws Exception {
+			super.open(parameters);
+
+			getRuntimeContext().addAccumulator(NUM_ELEMENTS_ACCUMULATOR, new IntCounter());
+		}
+
+		@Override
+		public void invoke(T value) throws Exception {
+			count++;
+			getRuntimeContext().getAccumulator(NUM_ELEMENTS_ACCUMULATOR).add(1);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9ed98f2e/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulUDFSavepointMigrationITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulUDFSavepointMigrationITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulUDFSavepointMigrationITCase.java
deleted file mode 100644
index 10a8998..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulUDFSavepointMigrationITCase.java
+++ /dev/null
@@ -1,562 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.checkpointing.utils;
-
-import org.apache.flink.api.common.accumulators.IntCounter;
-import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.api.common.functions.RichFlatMapFunction;
-import org.apache.flink.api.common.state.ValueState;
-import org.apache.flink.api.common.state.ValueStateDescriptor;
-import org.apache.flink.api.common.typeinfo.TypeHint;
-import org.apache.flink.api.common.typeutils.base.LongSerializer;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
-import org.apache.flink.core.fs.FSDataInputStream;
-import org.apache.flink.core.memory.DataInputViewStreamWrapper;
-import org.apache.flink.runtime.state.memory.MemoryStateBackend;
-import org.apache.flink.streaming.api.TimeCharacteristic;
-import org.apache.flink.streaming.api.checkpoint.Checkpointed;
-import org.apache.flink.streaming.api.checkpoint.CheckpointedRestoring;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
-import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
-import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
-import org.apache.flink.streaming.api.operators.TimestampedCollector;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.util.Collector;
-import org.junit.Ignore;
-import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
-
-/**
- * This verifies that we can restore a complete job from a Flink 1.1 savepoint.
- *
- * <p>The test pipeline contains both "Checkpointed" state and keyed user state.
- */
-public class StatefulUDFSavepointMigrationITCase extends SavepointMigrationTestBase {
-	private static final int NUM_SOURCE_ELEMENTS = 4;
-	private static final String EXPECTED_ELEMENTS_ACCUMULATOR = "NUM_EXPECTED_ELEMENTS";
-	private static final String SUCCESSFUL_CHECK_ACCUMULATOR = "SUCCESSFUL_CHECKS";
-
-	/**
-	 * This has to be manually executed to create the savepoint on Flink 1.1.
-	 */
-	@Test
-	@Ignore
-	public void testCreateSavepointOnFlink11() throws Exception {
-
-		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-		env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
-		// we only test memory state backend yet
-		env.setStateBackend(new MemoryStateBackend());
-		env.enableCheckpointing(500);
-		env.setParallelism(4);
-		env.setMaxParallelism(4);
-
-		// create source
-		env
-				.addSource(new LegacyCheckpointedSource(NUM_SOURCE_ELEMENTS)).setMaxParallelism(1).uid("LegacyCheckpointedSource")
-				.flatMap(new LegacyCheckpointedFlatMap()).startNewChain().uid("LegacyCheckpointedFlatMap")
-				.keyBy(0)
-				.flatMap(new LegacyCheckpointedFlatMapWithKeyedState()).startNewChain().uid("LegacyCheckpointedFlatMapWithKeyedState")
-				.keyBy(0)
-				.flatMap(new KeyedStateSettingFlatMap()).startNewChain().uid("KeyedStateSettingFlatMap")
-				.keyBy(0)
-				.transform(
-						"custom_operator",
-						new TypeHint<Tuple2<Long, Long>>() {}.getTypeInfo(),
-						new CheckpointedUdfOperator(new LegacyCheckpointedFlatMapWithKeyedState())).uid("LegacyCheckpointedOperator")
-				.addSink(new AccumulatorCountingSink<Tuple2<Long, Long>>(EXPECTED_ELEMENTS_ACCUMULATOR));
-
-		executeAndSavepoint(
-				env,
-				"src/test/resources/stateful-udf-migration-itcase-flink1.1-savepoint",
-				new Tuple2<>(EXPECTED_ELEMENTS_ACCUMULATOR, NUM_SOURCE_ELEMENTS));
-	}
-
-	/**
-	 * This has to be manually executed to create the savepoint on Flink 1.1.
-	 */
-	@Test
-	@Ignore
-	public void testCreateSavepointOnFlink11WithRocksDB() throws Exception {
-
-		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-		env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
-		RocksDBStateBackend rocksBackend =
-				new RocksDBStateBackend(new MemoryStateBackend());
-//		rocksBackend.enableFullyAsyncSnapshots();
-		env.setStateBackend(rocksBackend);
-		env.enableCheckpointing(500);
-		env.setParallelism(4);
-		env.setMaxParallelism(4);
-
-		// create source
-		env
-				.addSource(new LegacyCheckpointedSource(NUM_SOURCE_ELEMENTS)).setMaxParallelism(1).uid("LegacyCheckpointedSource")
-				.flatMap(new LegacyCheckpointedFlatMap()).startNewChain().uid("LegacyCheckpointedFlatMap")
-				.keyBy(0)
-				.flatMap(new LegacyCheckpointedFlatMapWithKeyedState()).startNewChain().uid("LegacyCheckpointedFlatMapWithKeyedState")
-				.keyBy(0)
-				.flatMap(new KeyedStateSettingFlatMap()).startNewChain().uid("KeyedStateSettingFlatMap")
-				.keyBy(0)
-				.transform(
-						"custom_operator",
-						new TypeHint<Tuple2<Long, Long>>() {}.getTypeInfo(),
-						new CheckpointedUdfOperator(new LegacyCheckpointedFlatMapWithKeyedState())).uid("LegacyCheckpointedOperator")
-				.addSink(new AccumulatorCountingSink<Tuple2<Long, Long>>(EXPECTED_ELEMENTS_ACCUMULATOR));
-
-		executeAndSavepoint(
-				env,
-				"src/test/resources/stateful-udf-migration-itcase-flink1.1-savepoint-rocksdb",
-				new Tuple2<>(EXPECTED_ELEMENTS_ACCUMULATOR, NUM_SOURCE_ELEMENTS));
-	}
-
-
-	@Test
-	public void testSavepointRestoreFromFlink11() throws Exception {
-
-		final int EXPECTED_SUCCESSFUL_CHECKS = 21;
-
-		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-		env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
-		// we only test memory state backend yet
-		env.setStateBackend(new MemoryStateBackend());
-		env.enableCheckpointing(500);
-		env.setParallelism(4);
-		env.setMaxParallelism(4);
-
-		// create source
-		env
-				.addSource(new RestoringCheckingSource(NUM_SOURCE_ELEMENTS)).setMaxParallelism(1).uid("LegacyCheckpointedSource")
-				.flatMap(new RestoringCheckingFlatMap()).startNewChain().uid("LegacyCheckpointedFlatMap")
-				.keyBy(0)
-				.flatMap(new RestoringCheckingFlatMapWithKeyedState()).startNewChain().uid("LegacyCheckpointedFlatMapWithKeyedState")
-				.keyBy(0)
-				.flatMap(new KeyedStateCheckingFlatMap()).startNewChain().uid("KeyedStateSettingFlatMap")
-				.keyBy(0)
-				.transform(
-						"custom_operator",
-						new TypeHint<Tuple2<Long, Long>>() {}.getTypeInfo(),
-						new RestoringCheckingUdfOperator(new RestoringCheckingFlatMapWithKeyedState())).uid("LegacyCheckpointedOperator")
-				.addSink(new AccumulatorCountingSink<Tuple2<Long, Long>>(EXPECTED_ELEMENTS_ACCUMULATOR));
-
-		restoreAndExecute(
-				env,
-				getResourceFilename("stateful-udf-migration-itcase-flink1.1-savepoint"),
-				new Tuple2<>(SUCCESSFUL_CHECK_ACCUMULATOR, EXPECTED_SUCCESSFUL_CHECKS));
-	}
-
-	@Test
-	public void testSavepointRestoreFromFlink11FromRocksDB() throws Exception {
-
-		final int EXPECTED_SUCCESSFUL_CHECKS = 21;
-
-		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-		env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
-		// we only test memory state backend yet
-		env.setStateBackend(new RocksDBStateBackend(new MemoryStateBackend()));
-		env.enableCheckpointing(500);
-		env.setParallelism(4);
-		env.setMaxParallelism(4);
-
-		// create source
-		env
-				.addSource(new RestoringCheckingSource(NUM_SOURCE_ELEMENTS)).setMaxParallelism(1).uid("LegacyCheckpointedSource")
-				.flatMap(new RestoringCheckingFlatMap()).startNewChain().uid("LegacyCheckpointedFlatMap")
-				.keyBy(0)
-				.flatMap(new RestoringCheckingFlatMapWithKeyedState()).startNewChain().uid("LegacyCheckpointedFlatMapWithKeyedState")
-				.keyBy(0)
-				.flatMap(new KeyedStateCheckingFlatMap()).startNewChain().uid("KeyedStateSettingFlatMap")
-				.keyBy(0)
-				.transform(
-						"custom_operator",
-						new TypeHint<Tuple2<Long, Long>>() {}.getTypeInfo(),
-						new RestoringCheckingUdfOperator(new RestoringCheckingFlatMapWithKeyedState())).uid("LegacyCheckpointedOperator")
-				.addSink(new AccumulatorCountingSink<Tuple2<Long, Long>>(EXPECTED_ELEMENTS_ACCUMULATOR));
-
-		restoreAndExecute(
-				env,
-				getResourceFilename("stateful-udf-migration-itcase-flink1.1-savepoint-rocksdb"),
-				new Tuple2<>(SUCCESSFUL_CHECK_ACCUMULATOR, EXPECTED_SUCCESSFUL_CHECKS));
-	}
-
-	private static class LegacyCheckpointedSource
-			implements SourceFunction<Tuple2<Long, Long>>, Checkpointed<String> {
-
-		public static String CHECKPOINTED_STRING = "Here be dragons!";
-
-		private static final long serialVersionUID = 1L;
-
-		private volatile boolean isRunning = true;
-
-		private final int numElements;
-
-		public LegacyCheckpointedSource(int numElements) {
-			this.numElements = numElements;
-		}
-
-		@Override
-		public void run(SourceContext<Tuple2<Long, Long>> ctx) throws Exception {
-
-			synchronized (ctx.getCheckpointLock()) {
-				for (long i = 0; i < numElements; i++) {
-					ctx.collect(new Tuple2<>(i, i));
-				}
-			}
-			while (isRunning) {
-				Thread.sleep(20);
-			}
-		}
-
-		@Override
-		public void cancel() {
-			isRunning = false;
-		}
-
-		@Override
-		public void restoreState(String state) throws Exception {
-			assertEquals(CHECKPOINTED_STRING, state);
-		}
-
-		@Override
-		public String snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
-			return CHECKPOINTED_STRING;
-		}
-	}
-
-	private static class RestoringCheckingSource
-			extends RichSourceFunction<Tuple2<Long, Long>>
-			implements CheckpointedRestoring<String> {
-
-		private static final long serialVersionUID = 1L;
-
-		private volatile boolean isRunning = true;
-
-		private final int numElements;
-
-		private String restoredState;
-
-		public RestoringCheckingSource(int numElements) {
-			this.numElements = numElements;
-		}
-
-		@Override
-		public void open(Configuration parameters) throws Exception {
-			super.open(parameters);
-
-			getRuntimeContext().addAccumulator(SUCCESSFUL_CHECK_ACCUMULATOR, new IntCounter());
-		}
-
-		@Override
-		public void run(SourceContext<Tuple2<Long, Long>> ctx) throws Exception {
-			assertEquals(LegacyCheckpointedSource.CHECKPOINTED_STRING, restoredState);
-			getRuntimeContext().getAccumulator(SUCCESSFUL_CHECK_ACCUMULATOR).add(1);
-
-			synchronized (ctx.getCheckpointLock()) {
-				for (long i = 0; i < numElements; i++) {
-					ctx.collect(new Tuple2<>(i, i));
-				}
-			}
-
-			while (isRunning) {
-				Thread.sleep(20);
-			}
-		}
-
-		@Override
-		public void cancel() {
-			isRunning = false;
-		}
-
-		@Override
-		public void restoreState(String state) throws Exception {
-			restoredState = state;
-		}
-	}
-
-	public static class LegacyCheckpointedFlatMap extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>
-			implements Checkpointed<Tuple2<String, Long>> {
-
-		private static final long serialVersionUID = 1L;
-
-		public static Tuple2<String, Long> CHECKPOINTED_TUPLE =
-				new Tuple2<>("hello", 42L);
-
-		@Override
-		public void flatMap(Tuple2<Long, Long> value, Collector<Tuple2<Long, Long>> out) throws Exception {
-			out.collect(value);
-		}
-
-		@Override
-		public void restoreState(Tuple2<String, Long> state) throws Exception {
-		}
-
-		@Override
-		public Tuple2<String, Long> snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
-			return CHECKPOINTED_TUPLE;
-		}
-	}
-
-	public static class RestoringCheckingFlatMap extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>
-			implements CheckpointedRestoring<Tuple2<String, Long>> {
-
-		private static final long serialVersionUID = 1L;
-
-		private transient Tuple2<String, Long> restoredState;
-
-		@Override
-		public void open(Configuration parameters) throws Exception {
-			super.open(parameters);
-
-			getRuntimeContext().addAccumulator(SUCCESSFUL_CHECK_ACCUMULATOR, new IntCounter());
-		}
-
-		@Override
-		public void flatMap(Tuple2<Long, Long> value, Collector<Tuple2<Long, Long>> out) throws Exception {
-			out.collect(value);
-
-			assertEquals(LegacyCheckpointedFlatMap.CHECKPOINTED_TUPLE, restoredState);
-			getRuntimeContext().getAccumulator(SUCCESSFUL_CHECK_ACCUMULATOR).add(1);
-
-		}
-
-		@Override
-		public void restoreState(Tuple2<String, Long> state) throws Exception {
-			restoredState = state;
-		}
-	}
-
-	public static class LegacyCheckpointedFlatMapWithKeyedState
-			extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>
-			implements Checkpointed<Tuple2<String, Long>> {
-
-		private static final long serialVersionUID = 1L;
-
-		public static Tuple2<String, Long> CHECKPOINTED_TUPLE =
-				new Tuple2<>("hello", 42L);
-
-		private final ValueStateDescriptor<Long> stateDescriptor =
-				new ValueStateDescriptor<Long>("state-name", LongSerializer.INSTANCE);
-
-		@Override
-		public void flatMap(Tuple2<Long, Long> value, Collector<Tuple2<Long, Long>> out) throws Exception {
-			out.collect(value);
-
-			getRuntimeContext().getState(stateDescriptor).update(value.f1);
-		}
-
-		@Override
-		public void restoreState(Tuple2<String, Long> state) throws Exception {
-		}
-
-		@Override
-		public Tuple2<String, Long> snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
-			return CHECKPOINTED_TUPLE;
-		}
-	}
-
-	public static class RestoringCheckingFlatMapWithKeyedState extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>
-			implements CheckpointedRestoring<Tuple2<String, Long>> {
-
-		private static final long serialVersionUID = 1L;
-
-		private transient Tuple2<String, Long> restoredState;
-
-		private final ValueStateDescriptor<Long> stateDescriptor =
-				new ValueStateDescriptor<Long>("state-name", LongSerializer.INSTANCE);
-
-		@Override
-		public void open(Configuration parameters) throws Exception {
-			super.open(parameters);
-
-			getRuntimeContext().addAccumulator(SUCCESSFUL_CHECK_ACCUMULATOR, new IntCounter());
-		}
-
-		@Override
-		public void flatMap(Tuple2<Long, Long> value, Collector<Tuple2<Long, Long>> out) throws Exception {
-			out.collect(value);
-
-			ValueState<Long> state = getRuntimeContext().getState(stateDescriptor);
-			if (state == null) {
-				throw new RuntimeException("Missing key value state for " + value);
-			}
-
-			assertEquals(value.f1, state.value());
-			assertEquals(LegacyCheckpointedFlatMap.CHECKPOINTED_TUPLE, restoredState);
-			getRuntimeContext().getAccumulator(SUCCESSFUL_CHECK_ACCUMULATOR).add(1);
-		}
-
-		@Override
-		public void restoreState(Tuple2<String, Long> state) throws Exception {
-			restoredState = state;
-		}
-	}
-
-	public static class KeyedStateSettingFlatMap extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {
-
-		private static final long serialVersionUID = 1L;
-
-		private final ValueStateDescriptor<Long> stateDescriptor =
-				new ValueStateDescriptor<Long>("state-name", LongSerializer.INSTANCE);
-
-		@Override
-		public void flatMap(Tuple2<Long, Long> value, Collector<Tuple2<Long, Long>> out) throws Exception {
-			out.collect(value);
-
-			getRuntimeContext().getState(stateDescriptor).update(value.f1);
-		}
-	}
-
-	public static class KeyedStateCheckingFlatMap extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {
-
-		private static final long serialVersionUID = 1L;
-
-		private final ValueStateDescriptor<Long> stateDescriptor =
-				new ValueStateDescriptor<Long>("state-name", LongSerializer.INSTANCE);
-
-		@Override
-		public void open(Configuration parameters) throws Exception {
-			super.open(parameters);
-
-			getRuntimeContext().addAccumulator(SUCCESSFUL_CHECK_ACCUMULATOR, new IntCounter());
-		}
-
-		@Override
-		public void flatMap(Tuple2<Long, Long> value, Collector<Tuple2<Long, Long>> out) throws Exception {
-			out.collect(value);
-
-			ValueState<Long> state = getRuntimeContext().getState(stateDescriptor);
-			if (state == null) {
-				throw new RuntimeException("Missing key value state for " + value);
-			}
-
-			assertEquals(value.f1, state.value());
-			getRuntimeContext().getAccumulator(SUCCESSFUL_CHECK_ACCUMULATOR).add(1);
-		}
-	}
-
-	public static class CheckpointedUdfOperator
-			extends AbstractUdfStreamOperator<Tuple2<Long, Long>, FlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>>
-			implements OneInputStreamOperator<Tuple2<Long, Long>, Tuple2<Long, Long>> {
-		private static final long serialVersionUID = 1L;
-
-		private static final String CHECKPOINTED_STRING = "Oh my, that's nice!";
-
-		public CheckpointedUdfOperator(FlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> userFunction) {
-			super(userFunction);
-		}
-
-		@Override
-		public void processElement(StreamRecord<Tuple2<Long, Long>> element) throws Exception {
-			output.collect(element);
-		}
-
-		@Override
-		public void processWatermark(Watermark mark) throws Exception {
-			output.emitWatermark(mark);
-		}
-
-		// Flink 1.1
-//		@Override
-//		public StreamTaskState snapshotOperatorState(
-//				long checkpointId, long timestamp) throws Exception {
-//			StreamTaskState result = super.snapshotOperatorState(checkpointId, timestamp);
-//
-//			AbstractStateBackend.CheckpointStateOutputView out = getStateBackend().createCheckpointStateOutputView(
-//					checkpointId,
-//					timestamp);
-//
-//			out.writeUTF(CHECKPOINTED_STRING);
-//
-//			result.setOperatorState(out.closeAndGetHandle());
-//
-//			return result;
-//		}
-	}
-
-	public static class RestoringCheckingUdfOperator
-			extends AbstractUdfStreamOperator<Tuple2<Long, Long>, FlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>>
-			implements OneInputStreamOperator<Tuple2<Long, Long>, Tuple2<Long, Long>> {
-		private static final long serialVersionUID = 1L;
-
-		private String restoredState;
-
-		public RestoringCheckingUdfOperator(FlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> userFunction) {
-			super(userFunction);
-		}
-
-		@Override
-		public void open() throws Exception {
-			super.open();
-		}
-
-		@Override
-		public void processElement(StreamRecord<Tuple2<Long, Long>> element) throws Exception {
-			userFunction.flatMap(element.getValue(), new TimestampedCollector<>(output));
-
-			assertEquals(CheckpointedUdfOperator.CHECKPOINTED_STRING, restoredState);
-			getRuntimeContext().getAccumulator(SUCCESSFUL_CHECK_ACCUMULATOR).add(1);
-		}
-
-		@Override
-		public void processWatermark(Watermark mark) throws Exception {
-			output.emitWatermark(mark);
-		}
-
-		@Override
-		public void restoreState(FSDataInputStream in) throws Exception {
-			super.restoreState(in);
-
-			DataInputViewStreamWrapper streamWrapper = new DataInputViewStreamWrapper(in);
-
-			restoredState = streamWrapper.readUTF();
-		}
-	}
-
-	public static class AccumulatorCountingSink<T> extends RichSinkFunction<T> {
-		private static final long serialVersionUID = 1L;
-
-		private final String accumulatorName;
-
-		int count = 0;
-
-		public AccumulatorCountingSink(String accumulatorName) {
-			this.accumulatorName = accumulatorName;
-		}
-
-		@Override
-		public void open(Configuration parameters) throws Exception {
-			super.open(parameters);
-
-			getRuntimeContext().addAccumulator(accumulatorName, new IntCounter());
-		}
-
-		@Override
-		public void invoke(T value) throws Exception {
-			count++;
-			getRuntimeContext().getAccumulator(accumulatorName).add(1);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/9ed98f2e/flink-tests/src/test/resources/stateful-udf-migration-itcase-flink1.2-savepoint
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/resources/stateful-udf-migration-itcase-flink1.2-savepoint b/flink-tests/src/test/resources/stateful-udf-migration-itcase-flink1.2-savepoint
new file mode 100644
index 0000000..a8d19f2
Binary files /dev/null and b/flink-tests/src/test/resources/stateful-udf-migration-itcase-flink1.2-savepoint differ

http://git-wip-us.apache.org/repos/asf/flink/blob/9ed98f2e/flink-tests/src/test/resources/stateful-udf-migration-itcase-flink1.2-savepoint-rocksdb
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/resources/stateful-udf-migration-itcase-flink1.2-savepoint-rocksdb b/flink-tests/src/test/resources/stateful-udf-migration-itcase-flink1.2-savepoint-rocksdb
new file mode 100644
index 0000000..548993f
Binary files /dev/null and b/flink-tests/src/test/resources/stateful-udf-migration-itcase-flink1.2-savepoint-rocksdb differ