You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2016/12/20 15:09:20 UTC

[05/15] flink git commit: [FLINK-5366] SavepointUtil into SavepointMigrationTestBase/Add Test

[FLINK-5366] SavepointUtil into SavepointMigrationTestBase/Add Test

This also changes how the savepoint is being performed and now we're
waiting on accumulators to signal that a job is ready for savepointing.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2cbd9f5d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2cbd9f5d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2cbd9f5d

Branch: refs/heads/master
Commit: 2cbd9f5d1ba43059b8bf748f97d2392b1e8f0ab3
Parents: 74df763
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Mon Dec 19 12:18:49 2016 +0100
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Tue Dec 20 15:42:54 2016 +0100

----------------------------------------------------------------------
 .../utils/SavepointMigrationTestBase.java       | 241 ++++++++
 .../test/checkpointing/utils/SavepointUtil.java | 341 -----------
 .../StatefulUDFSavepointMigrationITCase.java    | 562 +++++++++++++++++++
 .../utils/UserFunctionStateJob.java             | 113 ----
 ...eful-udf-migration-itcase-flink1.1-savepoint | Bin 0 -> 27146 bytes
 ...-migration-itcase-flink1.1-savepoint-rocksdb | Bin 0 -> 22283 bytes
 6 files changed, 803 insertions(+), 454 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/2cbd9f5d/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java
new file mode 100644
index 0000000..80a66ac
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing.utils;
+
+import java.io.File;
+import java.net.URI;
+import java.net.URL;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.flink.api.common.JobSubmissionResult;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.client.program.StandaloneClusterClient;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
+import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
+import org.apache.flink.runtime.state.filesystem.FsStateBackendFactory;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.test.util.TestBaseUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Option;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.Deadline;
+import scala.concurrent.duration.FiniteDuration;
+
+import static junit.framework.Assert.fail;
+
+public class SavepointMigrationTestBase extends TestBaseUtils {
+
+	@Rule
+	public TemporaryFolder tempFolder = new TemporaryFolder();
+
+	private static final Logger LOG = LoggerFactory.getLogger(SavepointMigrationTestBase.class);
+	private static final Deadline DEADLINE = new FiniteDuration(5, TimeUnit.MINUTES).fromNow();
+	protected static final int DEFAULT_PARALLELISM = 4;
+	protected LocalFlinkMiniCluster cluster = null;
+
+	protected static String getResourceFilename(String filename) {
+		ClassLoader cl = SavepointMigrationTestBase.class.getClassLoader();
+		URL resource = cl.getResource(filename);
+		if (resource == null) {
+			throw new NullPointerException("Missing snapshot resource.");
+		}
+		return resource.getFile();
+	}
+
+	@Before
+	public void setup() throws Exception {
+
+		// Flink configuration
+		final Configuration config = new Configuration();
+
+		config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1);
+		config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, DEFAULT_PARALLELISM);
+
+		final File checkpointDir = tempFolder.newFolder("checkpoints").getAbsoluteFile();
+		final File savepointDir = tempFolder.newFolder("savepoints").getAbsoluteFile();
+
+		if (!checkpointDir.exists() || !savepointDir.exists()) {
+			throw new Exception("Test setup failed: failed to create (temporary) directories.");
+		}
+
+		LOG.info("Created temporary checkpoint directory: " + checkpointDir + ".");
+		LOG.info("Created savepoint directory: " + savepointDir + ".");
+
+		config.setString(ConfigConstants.STATE_BACKEND, "memory");
+		config.setString(FsStateBackendFactory.CHECKPOINT_DIRECTORY_URI_CONF_KEY, checkpointDir.toURI().toString());
+		config.setString(FsStateBackendFactory.MEMORY_THRESHOLD_CONF_KEY, "0");
+		config.setString("state.savepoints.dir", savepointDir.toURI().toString());
+
+		cluster = TestBaseUtils.startCluster(config, false);
+	}
+
+	@After
+	public void teardown() throws Exception {
+		stopCluster(cluster, TestBaseUtils.DEFAULT_TIMEOUT);
+	}
+
+	protected void executeAndSavepoint(
+			StreamExecutionEnvironment env,
+			String savepointPath,
+			Tuple2<String, Integer>... expectedAccumulators) throws Exception {
+
+		// Retrieve the job manager
+		ActorGateway jobManager = Await.result(cluster.leaderGateway().future(), DEADLINE.timeLeft());
+
+		// Submit the job
+		JobGraph jobGraph = env.getStreamGraph().getJobGraph();
+
+
+		JobSubmissionResult jobSubmissionResult = cluster.submitJobDetached(jobGraph);
+
+		LOG.info("Submitted job {} and waiting...", jobSubmissionResult.getJobID());
+
+		StandaloneClusterClient clusterClient = new StandaloneClusterClient(cluster.configuration());
+
+		boolean done = false;
+		while (DEADLINE.hasTimeLeft()) {
+			Thread.sleep(100);
+			Map<String, Object> accumulators = clusterClient.getAccumulators(jobSubmissionResult.getJobID());
+
+			boolean allDone = true;
+			for (Tuple2<String, Integer> acc : expectedAccumulators) {
+				Integer numFinished = (Integer) accumulators.get(acc.f0);
+				if (numFinished == null) {
+					allDone = false;
+					break;
+				}
+				if (!numFinished.equals(acc.f1)) {
+					allDone = false;
+					break;
+				}
+			}
+			if (allDone) {
+				done = true;
+				break;
+			}
+		}
+
+		if (!done) {
+			fail("Did not see the expected accumulator results within time limit.");
+		}
+
+		LOG.info("Triggering savepoint.");
+		// Flink 1.2
+		final Future<Object> savepointResultFuture =
+				jobManager.ask(new JobManagerMessages.TriggerSavepoint(jobSubmissionResult.getJobID(), Option.<String>empty()), DEADLINE.timeLeft());
+
+		// Flink 1.1
+//		final Future<Object> savepointResultFuture =
+//				jobManager.ask(new JobManagerMessages.TriggerSavepoint(jobSubmissionResult.getJobID()), DEADLINE.timeLeft());
+
+
+		Object savepointResult = Await.result(savepointResultFuture, DEADLINE.timeLeft());
+
+		if (savepointResult instanceof JobManagerMessages.TriggerSavepointFailure) {
+			fail("Error drawing savepoint: " + ((JobManagerMessages.TriggerSavepointFailure) savepointResult).cause());
+		}
+
+		// jobmanager will store savepoint in heap, we have to retrieve it
+		final String jobmanagerSavepointPath = ((JobManagerMessages.TriggerSavepointSuccess) savepointResult).savepointPath();
+		LOG.info("Saved savepoint: " + jobmanagerSavepointPath);
+
+		// Flink 1.2
+		FileUtils.moveFile(new File(new URI(jobmanagerSavepointPath).getPath()), new File(savepointPath));
+
+		// Flink 1.1
+		// Retrieve the savepoint from the testing job manager
+//		LOG.info("Requesting the savepoint.");
+//		Future<Object> savepointFuture = jobManager.ask(new TestingJobManagerMessages.RequestSavepoint(jobmanagerSavepointPath), DEADLINE.timeLeft());
+//
+//		Savepoint savepoint = ((TestingJobManagerMessages.ResponseSavepoint) Await.result(savepointFuture, DEADLINE.timeLeft())).savepoint();
+//		LOG.info("Retrieved savepoint: " + jobmanagerSavepointPath + ".");
+//
+//		LOG.info("Storing savepoint to file.");
+//		Configuration config = new Configuration();
+//		config.setString(org.apache.flink.runtime.checkpoint.savepoint.SavepointStoreFactory.SAVEPOINT_BACKEND_KEY, "filesystem");
+//		config.setString(org.apache.flink.runtime.checkpoint.savepoint.SavepointStoreFactory.SAVEPOINT_DIRECTORY_KEY, "file:///Users/aljoscha/Downloads");
+//		String path = org.apache.flink.runtime.checkpoint.savepoint.SavepointStoreFactory.createFromConfig(config).storeSavepoint(savepoint);
+//
+//		FileUtils.moveFile(new File(new URI(path).getPath()), new File(savepointPath));
+	}
+
+	protected void restoreAndExecute(
+			StreamExecutionEnvironment env,
+			String savepointPath,
+			Tuple2<String, Integer>... expectedAccumulators) throws Exception {
+
+		int parallelism = env.getParallelism();
+
+		// Retrieve the job manager
+
+		ActorGateway jobManager = Await.result(cluster.leaderGateway().future(), DEADLINE.timeLeft());
+
+		// Submit the job
+		JobGraph jobGraph = env.getStreamGraph().getJobGraph();
+
+		jobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(savepointPath));
+
+		JobSubmissionResult jobSubmissionResult = cluster.submitJobDetached(jobGraph);
+
+		StandaloneClusterClient clusterClient = new StandaloneClusterClient(cluster.configuration());
+
+		boolean done = false;
+		while (DEADLINE.hasTimeLeft()) {
+			Thread.sleep(100);
+			Map<String, Object> accumulators = clusterClient.getAccumulators(jobSubmissionResult.getJobID());
+
+			boolean allDone = true;
+			for (Tuple2<String, Integer> acc : expectedAccumulators) {
+				Integer numFinished = (Integer) accumulators.get(acc.f0);
+				if (numFinished == null) {
+					System.out.println("NO ACC FOR " + acc);
+					allDone = false;
+					break;
+				}
+				if (!numFinished.equals(acc.f1)) {
+					System.out.println("TO LOW FOR ACC" + acc);
+					allDone = false;
+					break;
+				}
+			}
+			System.out.println("ACC: " + accumulators);
+			if (allDone) {
+				done = true;
+				break;
+			}
+		}
+
+		if (!done) {
+			fail("Did not see the expected accumulator results within time limit.");
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2cbd9f5d/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointUtil.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointUtil.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointUtil.java
deleted file mode 100644
index 85e21c5..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointUtil.java
+++ /dev/null
@@ -1,341 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.checkpointing.utils;
-
-import java.io.File;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.concurrent.TimeUnit;
-import org.apache.commons.io.FileUtils;
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.functions.RichFunction;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.checkpoint.savepoint.Savepoint;
-import org.apache.flink.runtime.instance.ActorGateway;
-import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
-import org.apache.flink.runtime.messages.JobManagerMessages;
-import org.apache.flink.runtime.state.filesystem.FsStateBackendFactory;
-import org.apache.flink.runtime.testingUtils.TestingCluster;
-import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.Option;
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.duration.Deadline;
-import scala.concurrent.duration.FiniteDuration;
-
-public class SavepointUtil {
-
-	// list of JobGraphs to create savepoints for
-	private static final ArrayList<Class<? extends SavepointTestJob>> savepointJobs = new ArrayList<>();
-	static {
-		savepointJobs.add(UserFunctionStateJob.class);
-	}
-
-	private static final Logger LOG = LoggerFactory.getLogger(SavepointUtil.class);
-	private static final Deadline DEADLINE = new FiniteDuration(5, TimeUnit.MINUTES).fromNow();
-	private static final String SAVEPOINT_BASE_DIR = "./flink-tests/src/test/resources/savepoints/";
-
-	private static final int STATE_WAIT_FOR_JOB = 0;
-	private static final int STATE_REQUEST_SAVEPOINT = 1;
-	private static final int STATE_SAVEPOINT_DONE = 2;
-	private static final int STATE_WAIT_FOR_TEST_JOB = 3;
-	private static final int STATE_TEST_RESULT = 4;
-	private static final int STATE_END = 5;
-
-	private static volatile int state = STATE_WAIT_FOR_JOB;
-
-	private static TestingCluster flink = null;
-	private static ActorGateway jobManager = null;
-	private static JobID jobId = null;
-	private static File savepointDir = null;
-	private static Exception testResult = null;
-
-	public static void main(String[] args) throws Exception {
-
-		// clean up
-//		FileUtils.deleteDirectory(new File(SAVEPOINT_BASE_DIR));
-
-		for (Class<? extends SavepointTestJob> testJob : savepointJobs) {
-			SavepointTestJob job = testJob.newInstance();
-
-//			runJobAndCreateSavepoint(job);
-
-			runJobAndCompareState(job);
-
-			triggerEndOfTest();
-		}
-	}
-
-	public static synchronized void triggerSavepoint() {
-		SavepointUtil.state = SavepointUtil.STATE_REQUEST_SAVEPOINT;
-	}
-
-	public static synchronized boolean allowStateChange() {
-		return SavepointUtil.state < SavepointUtil.STATE_REQUEST_SAVEPOINT;
-	}
-
-	public static synchronized void triggerOrTestSavepoint(RichFunction function, Object expected, Object actual) throws Exception {
-		if (SavepointUtil.state == SavepointUtil.STATE_WAIT_FOR_TEST_JOB) {
-			if (expected.equals(actual)) {
-				LOG.info("Test was successful.");
-				SavepointUtil.testResult = null;
-				SavepointUtil.state = SavepointUtil.STATE_TEST_RESULT;
-			} else {
-				LOG.info("Test failed.");
-				SavepointUtil.testResult = new Exception("Comparison of state failed. Expected: " + expected + " but was: " + actual);
-				SavepointUtil.state = SavepointUtil.STATE_TEST_RESULT;
-			}
-		} else if (SavepointUtil.state == SavepointUtil.STATE_WAIT_FOR_JOB) {
-			final StateCondition condition = new StateCondition(function.getClass(), function.getRuntimeContext().getIndexOfThisSubtask());
-			if (!testCounters.containsKey(condition)) {
-				testCounters.put(condition, 0);
-			}
-			final Integer counter = testCounters.get(condition);
-			testCounters.put(condition, counter + 1);
-			// check if all counters are ready
-			if (checkIfReadyForSavepoint()) {
-				SavepointUtil.state = SavepointUtil.STATE_REQUEST_SAVEPOINT;
-			}
-		}
-	}
-
-	public static void triggerEndOfTest() throws Exception {
-		LOG.info("Cancelling Flink.");
-		if (flink != null) {
-			flink.stop();
-		}
-		SavepointUtil.state = SavepointUtil.STATE_END;
-	}
-
-	public static void runJobAndCreateSavepoint(SavepointTestJob job) throws Exception {
-		LOG.info("Waiting for job.");
-		SavepointUtil.state = SavepointUtil.STATE_WAIT_FOR_JOB;
-
-		final Thread t = new Thread(new SavepointPerformer());
-		t.start();
-
-		runJob(job);
-
-		while(SavepointUtil.state != SavepointUtil.STATE_SAVEPOINT_DONE && DEADLINE.hasTimeLeft()) {
-			Thread.sleep(100);
-		}
-	}
-
-	public static void runJobAndCompareState(SavepointTestJob job) throws Exception {
-		LOG.info("Waiting for test job.");
-		SavepointUtil.state = SavepointUtil.STATE_WAIT_FOR_TEST_JOB;
-
-		runJob(job);
-
-		while(SavepointUtil.state != SavepointUtil.STATE_TEST_RESULT && DEADLINE.hasTimeLeft()) {
-			Thread.sleep(100);
-		}
-
-		if (SavepointUtil.state != SavepointUtil.STATE_TEST_RESULT) {
-			throw new Exception("No test result available.");
-		}
-		if (testResult != null) {
-			throw testResult;
-		}
-	}
-
-	public static void setTestResult(Exception e) {
-		SavepointUtil.testResult = e;
-		SavepointUtil.state = SavepointUtil.STATE_TEST_RESULT;
-	}
-
-	// --------------------------------------------------------------------------------------------
-
-	private static void runJob(SavepointTestJob job) throws Exception {
-		// Config
-		int numTaskManagers = 2;
-		int numSlotsPerTaskManager = 2;
-		int parallelism = numTaskManagers * numSlotsPerTaskManager;
-		String savepointPath = SAVEPOINT_BASE_DIR + job.getClass().getSimpleName();
-
-		// Flink configuration
-		final Configuration config = new Configuration();
-		config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTaskManagers);
-		config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlotsPerTaskManager);
-
-		final File checkpointDir = File.createTempFile("checkpoints", Long.toString(System.nanoTime()));
-		savepointDir = new File(savepointPath);
-		savepointDir.mkdirs();
-
-		if (!checkpointDir.exists() || !savepointDir.exists()) {
-			throw new Exception("Test setup failed: failed to create (temporary) directories.");
-		}
-
-		LOG.info("Created temporary checkpoint directory: " + checkpointDir + ".");
-		LOG.info("Created savepoint directory: " + savepointDir + ".");
-
-		config.setString(ConfigConstants.STATE_BACKEND, "filesystem");
-		config.setString(FsStateBackendFactory.CHECKPOINT_DIRECTORY_URI_CONF_KEY, checkpointDir.toURI().toString());
-		config.setString(FsStateBackendFactory.MEMORY_THRESHOLD_CONF_KEY, "0");
-		config.setString("state.savepoints.dir", savepointDir.toURI().toString());
-
-		LOG.info("Flink configuration: " + config + ".");
-
-		// Start Flink
-		flink = new TestingCluster(config);
-		flink.start();
-
-		// Retrieve the job manager
-		jobManager = Await.result(flink.leaderGateway().future(), DEADLINE.timeLeft());
-
-		// Submit the job
-		final JobGraph jobGraph = job.createJobGraph();
-		if (SavepointUtil.state == SavepointUtil.STATE_WAIT_FOR_JOB) {
-			savepointCondition = job.getSavepointCondition();
-			testCounters.clear();
-		} else if (SavepointUtil.state == SavepointUtil.STATE_WAIT_FOR_TEST_JOB) {
-			final File[] dir = savepointDir.listFiles();
-			if (dir.length == 0) {
-				throw new RuntimeException("Savepoint of " + job.getClass().getSimpleName() + " does not exist.");
-			}
-			jobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(dir[0].getAbsolutePath()));
-		}
-		jobId = jobGraph.getJobID();
-
-		LOG.info("Submitting job " + jobGraph.getJobID() + " and waiting...");
-
-		flink.submitJobDetached(jobGraph);
-	}
-
-	private static final HashMap<StateCondition, Integer> testCounters = new HashMap<>();
-	private static SavepointCondition[] savepointCondition = null;
-
-	private static boolean checkIfReadyForSavepoint() {
-		for (SavepointCondition condition : savepointCondition) {
-			final StateCondition stateCondition = new StateCondition(condition.clazz, condition.subtask);
-			if (!testCounters.containsKey(stateCondition) || testCounters.get(stateCondition) != condition.invocation) {
-				return false;
-			}
-		}
-		return true;
-	}
-
-	private static void performSavepointAndShutdown() throws Exception {
-		LOG.info("Triggering a savepoint.");
-
-		// Flink 1.2
-		final Future<Object> savepointPathFuture = jobManager.ask(new JobManagerMessages.TriggerSavepoint(jobId, Option.<String>empty()), DEADLINE.timeLeft());
-		// Flink 1.1
-//        final Future<Object> savepointPathFuture = jobManager.ask(new JobManagerMessages.TriggerSavepoint(jobId), DEADLINE.timeLeft());
-
-		final String savepointPath = ((JobManagerMessages.TriggerSavepointSuccess) Await.result(savepointPathFuture, DEADLINE.timeLeft())).savepointPath();
-		LOG.info("Saved savepoint: " + savepointPath);
-
-		// Retrieve the savepoint from the testing job manager
-		LOG.info("Requesting the savepoint.");
-		Future<Object> savepointFuture = jobManager.ask(new TestingJobManagerMessages.RequestSavepoint(savepointPath), DEADLINE.timeLeft());
-
-		Savepoint savepoint = ((TestingJobManagerMessages.ResponseSavepoint) Await.result(savepointFuture, DEADLINE.timeLeft())).savepoint();
-		LOG.info("Retrieved savepoint: " + savepointPath + ".");
-
-		LOG.info("Storing savepoint to file.");
-
-		// Flink 1.2
-		// it might be that the savepoint has already been written to file in Flink 1.2
-		// this is just the command how to do it in 1.2
-//        org.apache.flink.runtime.checkpoint.savepoint.SavepointStore.storeSavepoint(savepointDir.getAbsolutePath(), savepoint);
-		// Flink 1.1
-		// this writes it for FLink 1.1
-//        Configuration config = new Configuration();
-//        config.setString(org.apache.flink.runtime.checkpoint.savepoint.SavepointStoreFactory.SAVEPOINT_BACKEND_KEY, "filesystem");
-//        config.setString(org.apache.flink.runtime.checkpoint.savepoint.SavepointStoreFactory.SAVEPOINT_DIRECTORY_KEY, "file://" + savepointDir.getAbsolutePath());
-//        org.apache.flink.runtime.checkpoint.savepoint.SavepointStoreFactory.createFromConfig(config).storeSavepoint(savepoint);
-
-		LOG.info("Cancelling Flink.");
-		flink.stop();
-
-		SavepointUtil.state = SavepointUtil.STATE_SAVEPOINT_DONE;
-	}
-
-	private static class StateCondition {
-		private Class<?> clazz;
-		private Integer subtask;
-
-		StateCondition(Class<?> clazz, Integer subtask) {
-			this.clazz = clazz;
-			this.subtask = subtask;
-		}
-
-		@Override
-		public boolean equals(Object o) {
-			if (this == o) return true;
-			if (o == null || getClass() != o.getClass()) return false;
-
-			StateCondition that = (StateCondition) o;
-
-			return clazz.equals(that.clazz) && subtask.equals(that.subtask);
-		}
-
-		@Override
-		public int hashCode() {
-			int result = clazz.hashCode();
-			result = 31 * result + subtask.hashCode();
-			return result;
-		}
-	}
-
-	public static class SavepointCondition {
-		Class<? extends RichFunction> clazz;
-		int subtask;
-		int invocation;
-
-		SavepointCondition(Class<? extends RichFunction> clazz, int subtask, int invocation) {
-			this.clazz = clazz;
-			this.subtask = subtask;
-			this.invocation = invocation;
-		}
-	}
-
-	public interface SavepointTestJob {
-		JobGraph createJobGraph();
-
-		SavepointCondition[] getSavepointCondition();
-	}
-
-	private static class SavepointPerformer implements Runnable {
-
-		@Override
-		public void run() {
-			try {
-				while (SavepointUtil.state != SavepointUtil.STATE_END) {
-					Thread.sleep(100);
-					if (SavepointUtil.state == SavepointUtil.STATE_REQUEST_SAVEPOINT) {
-						try {
-							performSavepointAndShutdown();
-						} catch (Exception e) {
-							throw new RuntimeException("Performing savepoint failed.", e);
-						}
-					}
-				}
-			} catch (InterruptedException e) {
-				// stop execution
-			}
-			LOG.info("SavepointPerformer Thread finished.");
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/2cbd9f5d/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulUDFSavepointMigrationITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulUDFSavepointMigrationITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulUDFSavepointMigrationITCase.java
new file mode 100644
index 0000000..cc21683
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulUDFSavepointMigrationITCase.java
@@ -0,0 +1,562 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing.utils;
+
+import org.apache.flink.api.common.accumulators.IntCounter;
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.checkpoint.Checkpointed;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedRestoring;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.Collector;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * This verifies that we can restore a complete job from a Flink 1.1 savepoint.
+ *
+ * <p>The test pipeline contains both "Checkpointed" state and keyed user state.
+ */
+public class StatefulUDFSavepointMigrationITCase extends SavepointMigrationTestBase {
+	private static final int NUM_SOURCE_ELEMENTS = 4;
+	private static final String EXPECTED_ELEMENTS_ACCUMULATOR = "NUM_EXPECTED_ELEMENTS";
+	private static final String SUCCESSFUL_CHECK_ACCUMULATOR = "SUCCESSFUL_CHECKS";
+
+	/**
+	 * This has to be manually executed to create the savepoint on Flink 1.1.
+	 */
+	@Test
+	@Ignore
+	public void testCreateSavepointOnFlink11() throws Exception {
+
+		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+		// we only test memory state backend yet
+		env.setStateBackend(new MemoryStateBackend());
+		env.enableCheckpointing(500);
+		env.setParallelism(4);
+		env.setMaxParallelism(4);
+
+		// create source
+		env
+				.addSource(new LegacyCheckpointedSource(NUM_SOURCE_ELEMENTS)).setMaxParallelism(1).uid("LegacyCheckpointedSource")
+				.flatMap(new LegacyCheckpointedFlatMap()).startNewChain().uid("LegacyCheckpointedFlatMap")
+				.keyBy(0)
+				.flatMap(new LegacyCheckpointedFlatMapWithKeyedState()).startNewChain().uid("LegacyCheckpointedFlatMapWithKeyedState")
+				.keyBy(0)
+				.flatMap(new KeyedStateSettingFlatMap()).startNewChain().uid("KeyedStateSettingFlatMap")
+				.keyBy(0)
+				.transform(
+						"custom_operator",
+						new TypeHint<Tuple2<Long, Long>>() {}.getTypeInfo(),
+						new CheckpointedUdfOperator(new LegacyCheckpointedFlatMapWithKeyedState())).uid("LegacyCheckpointedOperator")
+				.addSink(new AccumulatorCountingSink<Tuple2<Long, Long>>(EXPECTED_ELEMENTS_ACCUMULATOR));
+
+		executeAndSavepoint(
+				env,
+				"src/test/resources/stateful-udf-migration-itcase-flink1.1-savepoint",
+				new Tuple2<>(EXPECTED_ELEMENTS_ACCUMULATOR, NUM_SOURCE_ELEMENTS));
+	}
+
+	/**
+	 * This has to be manually executed to create the savepoint on Flink 1.1.
+	 */
+	@Test
+	@Ignore
+	public void testCreateSavepointOnFlink11WithRocksDB() throws Exception {
+
+		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+		RocksDBStateBackend rocksBackend =
+				new RocksDBStateBackend(new MemoryStateBackend());
+//		rocksBackend.enableFullyAsyncSnapshots();
+		env.setStateBackend(rocksBackend);
+		env.enableCheckpointing(500);
+		env.setParallelism(4);
+		env.setMaxParallelism(4);
+
+		// create source
+		env
+				.addSource(new LegacyCheckpointedSource(NUM_SOURCE_ELEMENTS)).setMaxParallelism(1).uid("LegacyCheckpointedSource")
+				.flatMap(new LegacyCheckpointedFlatMap()).startNewChain().uid("LegacyCheckpointedFlatMap")
+				.keyBy(0)
+				.flatMap(new LegacyCheckpointedFlatMapWithKeyedState()).startNewChain().uid("LegacyCheckpointedFlatMapWithKeyedState")
+				.keyBy(0)
+				.flatMap(new KeyedStateSettingFlatMap()).startNewChain().uid("KeyedStateSettingFlatMap")
+				.keyBy(0)
+				.transform(
+						"custom_operator",
+						new TypeHint<Tuple2<Long, Long>>() {}.getTypeInfo(),
+						new CheckpointedUdfOperator(new LegacyCheckpointedFlatMapWithKeyedState())).uid("LegacyCheckpointedOperator")
+				.addSink(new AccumulatorCountingSink<Tuple2<Long, Long>>(EXPECTED_ELEMENTS_ACCUMULATOR));
+
+		executeAndSavepoint(
+				env,
+				"src/test/resources/stateful-udf-migration-itcase-flink1.1-savepoint-rocksdb",
+				new Tuple2<>(EXPECTED_ELEMENTS_ACCUMULATOR, NUM_SOURCE_ELEMENTS));
+	}
+
+
+	@Test
+	public void testSavepointRestoreFromFlink11() throws Exception {
+
+		final int EXPECTED_SUCCESSFUL_CHECKS = 21;
+
+		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+		// we only test memory state backend yet
+		env.setStateBackend(new MemoryStateBackend());
+		env.enableCheckpointing(500);
+		env.setParallelism(4);
+		env.setMaxParallelism(4);
+
+		// create source
+		env
+				.addSource(new RestoringCheckingSource(NUM_SOURCE_ELEMENTS)).setMaxParallelism(1).uid("LegacyCheckpointedSource")
+				.flatMap(new RestoringCheckingFlatMap()).startNewChain().uid("LegacyCheckpointedFlatMap")
+				.keyBy(0)
+				.flatMap(new RestoringCheckingFlatMapWithKeyedState()).startNewChain().uid("LegacyCheckpointedFlatMapWithKeyedState")
+				.keyBy(0)
+				.flatMap(new KeyedStateCheckingFlatMap()).startNewChain().uid("KeyedStateSettingFlatMap")
+				.keyBy(0)
+				.transform(
+						"custom_operator",
+						new TypeHint<Tuple2<Long, Long>>() {}.getTypeInfo(),
+						new RestoringCheckingUdfOperator(new RestoringCheckingFlatMapWithKeyedState())).uid("LegacyCheckpointedOperator")
+				.addSink(new AccumulatorCountingSink<Tuple2<Long, Long>>(EXPECTED_ELEMENTS_ACCUMULATOR));
+
+		restoreAndExecute(
+				env,
+				getResourceFilename("stateful-udf-migration-itcase-flink1.1-savepoint"),
+				new Tuple2<>(SUCCESSFUL_CHECK_ACCUMULATOR, EXPECTED_SUCCESSFUL_CHECKS));
+	}
+
+	@Test
+	public void testSavepointRestoreFromFlink11FromRocksDB() throws Exception {
+
+		final int EXPECTED_SUCCESSFUL_CHECKS = 21;
+
+		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+		// we only test memory state backend yet
+		env.setStateBackend(new RocksDBStateBackend(new MemoryStateBackend()));
+		env.enableCheckpointing(500);
+		env.setParallelism(4);
+		env.setMaxParallelism(4);
+
+		// create source
+		env
+				.addSource(new RestoringCheckingSource(NUM_SOURCE_ELEMENTS)).setMaxParallelism(1).uid("LegacyCheckpointedSource")
+				.flatMap(new RestoringCheckingFlatMap()).startNewChain().uid("LegacyCheckpointedFlatMap")
+				.keyBy(0)
+				.flatMap(new RestoringCheckingFlatMapWithKeyedState()).startNewChain().uid("LegacyCheckpointedFlatMapWithKeyedState")
+				.keyBy(0)
+				.flatMap(new KeyedStateCheckingFlatMap()).startNewChain().uid("KeyedStateSettingFlatMap")
+				.keyBy(0)
+				.transform(
+						"custom_operator",
+						new TypeHint<Tuple2<Long, Long>>() {}.getTypeInfo(),
+						new RestoringCheckingUdfOperator(new RestoringCheckingFlatMapWithKeyedState())).uid("LegacyCheckpointedOperator")
+				.addSink(new AccumulatorCountingSink<Tuple2<Long, Long>>(EXPECTED_ELEMENTS_ACCUMULATOR));
+
+		restoreAndExecute(
+				env,
+				getResourceFilename("stateful-udf-migration-itcase-flink1.1-savepoint-rocksdb"),
+				new Tuple2<>(SUCCESSFUL_CHECK_ACCUMULATOR, EXPECTED_SUCCESSFUL_CHECKS));
+	}
+
+	private static class LegacyCheckpointedSource
+			implements SourceFunction<Tuple2<Long, Long>>, Checkpointed<String> {
+
+		public static String CHECKPOINTED_STRING = "Here be dragons!";
+
+		private static final long serialVersionUID = 1L;
+
+		private volatile boolean isRunning = true;
+
+		private final int numElements;
+
+		public LegacyCheckpointedSource(int numElements) {
+			this.numElements = numElements;
+		}
+
+		@Override
+		public void run(SourceContext<Tuple2<Long, Long>> ctx) throws Exception {
+
+			synchronized (ctx.getCheckpointLock()) {
+				for (long i = 0; i < numElements; i++) {
+					ctx.collect(new Tuple2<>(i, i));
+				}
+			}
+			while (isRunning) {
+				Thread.sleep(20);
+			}
+		}
+
+		@Override
+		public void cancel() {
+			isRunning = false;
+		}
+
+		@Override
+		public void restoreState(String state) throws Exception {
+			assertEquals(CHECKPOINTED_STRING, state);
+		}
+
+		@Override
+		public String snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
+			return CHECKPOINTED_STRING;
+		}
+	}
+
+	private static class RestoringCheckingSource
+			extends RichSourceFunction<Tuple2<Long, Long>>
+			implements CheckpointedRestoring<String> {
+
+		private static final long serialVersionUID = 1L;
+
+		private volatile boolean isRunning = true;
+
+		private final int numElements;
+
+		private String restoredState;
+
+		public RestoringCheckingSource(int numElements) {
+			this.numElements = numElements;
+		}
+
+		@Override
+		public void open(Configuration parameters) throws Exception {
+			super.open(parameters);
+
+			getRuntimeContext().addAccumulator(SUCCESSFUL_CHECK_ACCUMULATOR, new IntCounter());
+		}
+
+		@Override
+		public void run(SourceContext<Tuple2<Long, Long>> ctx) throws Exception {
+			assertEquals(LegacyCheckpointedSource.CHECKPOINTED_STRING, restoredState);
+			getRuntimeContext().getAccumulator(SUCCESSFUL_CHECK_ACCUMULATOR).add(1);
+
+			synchronized (ctx.getCheckpointLock()) {
+				for (long i = 0; i < numElements; i++) {
+					ctx.collect(new Tuple2<>(i, i));
+				}
+			}
+
+			while (isRunning) {
+				Thread.sleep(20);
+			}
+		}
+
+		@Override
+		public void cancel() {
+			isRunning = false;
+		}
+
+		@Override
+		public void restoreState(String state) throws Exception {
+			restoredState = state;
+		}
+	}
+
+	public static class LegacyCheckpointedFlatMap extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>
+			implements Checkpointed<Tuple2<String, Long>> {
+
+		private static final long serialVersionUID = 1L;
+
+		public static Tuple2<String, Long> CHECKPOINTED_TUPLE =
+				new Tuple2<>("hello", 42L);
+
+		@Override
+		public void flatMap(Tuple2<Long, Long> value, Collector<Tuple2<Long, Long>> out) throws Exception {
+			out.collect(value);
+		}
+
+		@Override
+		public void restoreState(Tuple2<String, Long> state) throws Exception {
+		}
+
+		@Override
+		public Tuple2<String, Long> snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
+			return CHECKPOINTED_TUPLE;
+		}
+	}
+
+	public static class RestoringCheckingFlatMap extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>
+			implements CheckpointedRestoring<Tuple2<String, Long>> {
+
+		private static final long serialVersionUID = 1L;
+
+		private transient Tuple2<String, Long> restoredState;
+
+		@Override
+		public void open(Configuration parameters) throws Exception {
+			super.open(parameters);
+
+			getRuntimeContext().addAccumulator(SUCCESSFUL_CHECK_ACCUMULATOR, new IntCounter());
+		}
+
+		@Override
+		public void flatMap(Tuple2<Long, Long> value, Collector<Tuple2<Long, Long>> out) throws Exception {
+			out.collect(value);
+
+			assertEquals(LegacyCheckpointedFlatMap.CHECKPOINTED_TUPLE, restoredState);
+			getRuntimeContext().getAccumulator(SUCCESSFUL_CHECK_ACCUMULATOR).add(1);
+
+		}
+
+		@Override
+		public void restoreState(Tuple2<String, Long> state) throws Exception {
+			restoredState = state;
+		}
+	}
+
+	public static class LegacyCheckpointedFlatMapWithKeyedState
+			extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>
+			implements Checkpointed<Tuple2<String, Long>> {
+
+		private static final long serialVersionUID = 1L;
+
+		public static Tuple2<String, Long> CHECKPOINTED_TUPLE =
+				new Tuple2<>("hello", 42L);
+
+		private final ValueStateDescriptor<Long> stateDescriptor =
+				new ValueStateDescriptor<Long>("state-name", LongSerializer.INSTANCE, null);
+
+		@Override
+		public void flatMap(Tuple2<Long, Long> value, Collector<Tuple2<Long, Long>> out) throws Exception {
+			out.collect(value);
+
+			getRuntimeContext().getState(stateDescriptor).update(value.f1);
+		}
+
+		@Override
+		public void restoreState(Tuple2<String, Long> state) throws Exception {
+		}
+
+		@Override
+		public Tuple2<String, Long> snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
+			return CHECKPOINTED_TUPLE;
+		}
+	}
+
+	public static class RestoringCheckingFlatMapWithKeyedState extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>
+			implements CheckpointedRestoring<Tuple2<String, Long>> {
+
+		private static final long serialVersionUID = 1L;
+
+		private transient Tuple2<String, Long> restoredState;
+
+		private final ValueStateDescriptor<Long> stateDescriptor =
+				new ValueStateDescriptor<Long>("state-name", LongSerializer.INSTANCE, null);
+
+		@Override
+		public void open(Configuration parameters) throws Exception {
+			super.open(parameters);
+
+			getRuntimeContext().addAccumulator(SUCCESSFUL_CHECK_ACCUMULATOR, new IntCounter());
+		}
+
+		@Override
+		public void flatMap(Tuple2<Long, Long> value, Collector<Tuple2<Long, Long>> out) throws Exception {
+			out.collect(value);
+
+			ValueState<Long> state = getRuntimeContext().getState(stateDescriptor);
+			if (state == null) {
+				throw new RuntimeException("Missing key value state for " + value);
+			}
+
+			assertEquals(value.f1, state.value());
+			assertEquals(LegacyCheckpointedFlatMap.CHECKPOINTED_TUPLE, restoredState);
+			getRuntimeContext().getAccumulator(SUCCESSFUL_CHECK_ACCUMULATOR).add(1);
+		}
+
+		@Override
+		public void restoreState(Tuple2<String, Long> state) throws Exception {
+			restoredState = state;
+		}
+	}
+
+	public static class KeyedStateSettingFlatMap extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {
+
+		private static final long serialVersionUID = 1L;
+
+		private final ValueStateDescriptor<Long> stateDescriptor =
+				new ValueStateDescriptor<Long>("state-name", LongSerializer.INSTANCE, null);
+
+		@Override
+		public void flatMap(Tuple2<Long, Long> value, Collector<Tuple2<Long, Long>> out) throws Exception {
+			out.collect(value);
+
+			getRuntimeContext().getState(stateDescriptor).update(value.f1);
+		}
+	}
+
+	public static class KeyedStateCheckingFlatMap extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {
+
+		private static final long serialVersionUID = 1L;
+
+		private final ValueStateDescriptor<Long> stateDescriptor =
+				new ValueStateDescriptor<Long>("state-name", LongSerializer.INSTANCE, null);
+
+		@Override
+		public void open(Configuration parameters) throws Exception {
+			super.open(parameters);
+
+			getRuntimeContext().addAccumulator(SUCCESSFUL_CHECK_ACCUMULATOR, new IntCounter());
+		}
+
+		@Override
+		public void flatMap(Tuple2<Long, Long> value, Collector<Tuple2<Long, Long>> out) throws Exception {
+			out.collect(value);
+
+			ValueState<Long> state = getRuntimeContext().getState(stateDescriptor);
+			if (state == null) {
+				throw new RuntimeException("Missing key value state for " + value);
+			}
+
+			assertEquals(value.f1, state.value());
+			getRuntimeContext().getAccumulator(SUCCESSFUL_CHECK_ACCUMULATOR).add(1);
+		}
+	}
+
+	public static class CheckpointedUdfOperator
+			extends AbstractUdfStreamOperator<Tuple2<Long, Long>, FlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>>
+			implements OneInputStreamOperator<Tuple2<Long, Long>, Tuple2<Long, Long>> {
+		private static final long serialVersionUID = 1L;
+
+		private static final String CHECKPOINTED_STRING = "Oh my, that's nice!";
+
+		public CheckpointedUdfOperator(FlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> userFunction) {
+			super(userFunction);
+		}
+
+		@Override
+		public void processElement(StreamRecord<Tuple2<Long, Long>> element) throws Exception {
+			output.collect(element);
+		}
+
+		@Override
+		public void processWatermark(Watermark mark) throws Exception {
+			output.emitWatermark(mark);
+		}
+
+		// Flink 1.1
+//		@Override
+//		public StreamTaskState snapshotOperatorState(
+//				long checkpointId, long timestamp) throws Exception {
+//			StreamTaskState result = super.snapshotOperatorState(checkpointId, timestamp);
+//
+//			AbstractStateBackend.CheckpointStateOutputView out = getStateBackend().createCheckpointStateOutputView(
+//					checkpointId,
+//					timestamp);
+//
+//			out.writeUTF(CHECKPOINTED_STRING);
+//
+//			result.setOperatorState(out.closeAndGetHandle());
+//
+//			return result;
+//		}
+	}
+
+	public static class RestoringCheckingUdfOperator
+			extends AbstractUdfStreamOperator<Tuple2<Long, Long>, FlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>>
+			implements OneInputStreamOperator<Tuple2<Long, Long>, Tuple2<Long, Long>> {
+		private static final long serialVersionUID = 1L;
+
+		private String restoredState;
+
+		public RestoringCheckingUdfOperator(FlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> userFunction) {
+			super(userFunction);
+		}
+
+		@Override
+		public void open() throws Exception {
+			super.open();
+		}
+
+		@Override
+		public void processElement(StreamRecord<Tuple2<Long, Long>> element) throws Exception {
+			userFunction.flatMap(element.getValue(), new TimestampedCollector<>(output));
+
+			assertEquals(CheckpointedUdfOperator.CHECKPOINTED_STRING, restoredState);
+			getRuntimeContext().getAccumulator(SUCCESSFUL_CHECK_ACCUMULATOR).add(1);
+		}
+
+		@Override
+		public void processWatermark(Watermark mark) throws Exception {
+			output.emitWatermark(mark);
+		}
+
+		@Override
+		public void restoreState(FSDataInputStream in) throws Exception {
+			super.restoreState(in);
+
+			DataInputViewStreamWrapper streamWrapper = new DataInputViewStreamWrapper(in);
+
+			restoredState = streamWrapper.readUTF();
+		}
+	}
+
+	public static class AccumulatorCountingSink<T> extends RichSinkFunction<T> {
+		private static final long serialVersionUID = 1L;
+
+		private final String accumulatorName;
+
+		int count = 0;
+
+		public AccumulatorCountingSink(String accumulatorName) {
+			this.accumulatorName = accumulatorName;
+		}
+
+		@Override
+		public void open(Configuration parameters) throws Exception {
+			super.open(parameters);
+
+			getRuntimeContext().addAccumulator(accumulatorName, new IntCounter());
+		}
+
+		@Override
+		public void invoke(T value) throws Exception {
+			count++;
+			getRuntimeContext().getAccumulator(accumulatorName).add(1);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2cbd9f5d/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/UserFunctionStateJob.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/UserFunctionStateJob.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/UserFunctionStateJob.java
deleted file mode 100644
index 1df7938..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/UserFunctionStateJob.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.checkpointing.utils;
-
-import org.apache.flink.api.common.functions.RichFlatMapFunction;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.state.memory.MemoryStateBackend;
-import org.apache.flink.streaming.api.TimeCharacteristic;
-import org.apache.flink.streaming.api.checkpoint.Checkpointed;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.test.checkpointing.utils.SavepointUtil.SavepointCondition;
-import org.apache.flink.test.checkpointing.utils.SavepointUtil.SavepointTestJob;
-import org.apache.flink.util.Collector;
-
-public class UserFunctionStateJob implements SavepointTestJob {
-
-	@Override
-	public JobGraph createJobGraph() {
-		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-		env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
-		// we only test memory state backend yet
-		env.setStateBackend(new MemoryStateBackend());
-		env.enableCheckpointing(500);
-		env.setParallelism(1);
-		env.setMaxParallelism(1);
-
-		// create source
-		final DataStream<Tuple2<Long, Long>> source = env
-			.addSource(new SourceFunction<Tuple2<Long, Long>>() {
-
-				private volatile boolean isRunning = true;
-
-				@Override
-				public void run(SourceContext<Tuple2<Long, Long>> ctx) throws Exception {
-					while (isRunning) {
-						synchronized (ctx.getCheckpointLock()) {
-							ctx.collect(new Tuple2<>(1L, 1L));
-						}
-					}
-				}
-
-				@Override
-				public void cancel() {
-					isRunning = false;
-				}
-			}).uid("CustomSourceFunction");
-
-		// non-keyed operator state
-		source.flatMap(new SumFlatMapperNonKeyedCheckpointed()).uid("SumFlatMapperNonKeyedCheckpointed").startNewChain().print();
-
-		return env.getStreamGraph().getJobGraph();
-	}
-
-	@Override
-	public SavepointCondition[] getSavepointCondition() {
-		return new SavepointCondition[] {
-			new SavepointCondition(SumFlatMapperNonKeyedCheckpointed.class, 0, 4)
-		};
-	}
-
-	public static class SumFlatMapperNonKeyedCheckpointed extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>
-			implements Checkpointed<Tuple2<Long, Long>> {
-
-		private transient Tuple2<Long, Long> sum;
-
-		@Override
-		public void restoreState(Tuple2<Long, Long> state) throws Exception {
-			sum = state;
-		}
-
-		@Override
-		public void flatMap(Tuple2<Long, Long> value, Collector<Tuple2<Long, Long>> out) throws Exception {
-			if (SavepointUtil.allowStateChange()) {
-				if (sum == null) {
-					sum = value;
-					out.collect(sum);
-				} else {
-					sum.f1 += value.f1;
-					out.collect(sum);
-				}
-			}
-
-			SavepointUtil.triggerOrTestSavepoint(
-				this,
-				new Tuple2<>(value.f1, value.f1 * 4),
-				sum);
-		}
-
-		@Override
-		public Tuple2<Long, Long> snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
-			return sum;
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/2cbd9f5d/flink-tests/src/test/resources/stateful-udf-migration-itcase-flink1.1-savepoint
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/resources/stateful-udf-migration-itcase-flink1.1-savepoint b/flink-tests/src/test/resources/stateful-udf-migration-itcase-flink1.1-savepoint
new file mode 100644
index 0000000..f2f6dcd
Binary files /dev/null and b/flink-tests/src/test/resources/stateful-udf-migration-itcase-flink1.1-savepoint differ

http://git-wip-us.apache.org/repos/asf/flink/blob/2cbd9f5d/flink-tests/src/test/resources/stateful-udf-migration-itcase-flink1.1-savepoint-rocksdb
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/resources/stateful-udf-migration-itcase-flink1.1-savepoint-rocksdb b/flink-tests/src/test/resources/stateful-udf-migration-itcase-flink1.1-savepoint-rocksdb
new file mode 100644
index 0000000..e63038b
Binary files /dev/null and b/flink-tests/src/test/resources/stateful-udf-migration-itcase-flink1.1-savepoint-rocksdb differ