You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2016/12/20 15:09:20 UTC
[05/15] flink git commit: [FLINK-5366] SavepointUtil into
SavepointMigrationTestBase/Add Test
[FLINK-5366] SavepointUtil into SavepointMigrationTestBase/Add Test
This also changes how the savepoint is being performed and now we're
waiting on accumulators to signal that a job is ready for savepointing.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2cbd9f5d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2cbd9f5d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2cbd9f5d
Branch: refs/heads/master
Commit: 2cbd9f5d1ba43059b8bf748f97d2392b1e8f0ab3
Parents: 74df763
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Mon Dec 19 12:18:49 2016 +0100
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Tue Dec 20 15:42:54 2016 +0100
----------------------------------------------------------------------
.../utils/SavepointMigrationTestBase.java | 241 ++++++++
.../test/checkpointing/utils/SavepointUtil.java | 341 -----------
.../StatefulUDFSavepointMigrationITCase.java | 562 +++++++++++++++++++
.../utils/UserFunctionStateJob.java | 113 ----
...eful-udf-migration-itcase-flink1.1-savepoint | Bin 0 -> 27146 bytes
...-migration-itcase-flink1.1-savepoint-rocksdb | Bin 0 -> 22283 bytes
6 files changed, 803 insertions(+), 454 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/2cbd9f5d/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java
new file mode 100644
index 0000000..80a66ac
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing.utils;
+
+import java.io.File;
+import java.net.URI;
+import java.net.URL;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.flink.api.common.JobSubmissionResult;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.client.program.StandaloneClusterClient;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
+import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
+import org.apache.flink.runtime.state.filesystem.FsStateBackendFactory;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.test.util.TestBaseUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Option;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.Deadline;
+import scala.concurrent.duration.FiniteDuration;
+
+import static junit.framework.Assert.fail;
+
+public class SavepointMigrationTestBase extends TestBaseUtils {
+
+ @Rule
+ public TemporaryFolder tempFolder = new TemporaryFolder();
+
+ private static final Logger LOG = LoggerFactory.getLogger(SavepointMigrationTestBase.class);
+ private static final Deadline DEADLINE = new FiniteDuration(5, TimeUnit.MINUTES).fromNow();
+ protected static final int DEFAULT_PARALLELISM = 4;
+ protected LocalFlinkMiniCluster cluster = null;
+
+ protected static String getResourceFilename(String filename) {
+ ClassLoader cl = SavepointMigrationTestBase.class.getClassLoader();
+ URL resource = cl.getResource(filename);
+ if (resource == null) {
+ throw new NullPointerException("Missing snapshot resource.");
+ }
+ return resource.getFile();
+ }
+
+ @Before
+ public void setup() throws Exception {
+
+ // Flink configuration
+ final Configuration config = new Configuration();
+
+ config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1);
+ config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, DEFAULT_PARALLELISM);
+
+ final File checkpointDir = tempFolder.newFolder("checkpoints").getAbsoluteFile();
+ final File savepointDir = tempFolder.newFolder("savepoints").getAbsoluteFile();
+
+ if (!checkpointDir.exists() || !savepointDir.exists()) {
+ throw new Exception("Test setup failed: failed to create (temporary) directories.");
+ }
+
+ LOG.info("Created temporary checkpoint directory: " + checkpointDir + ".");
+ LOG.info("Created savepoint directory: " + savepointDir + ".");
+
+ config.setString(ConfigConstants.STATE_BACKEND, "memory");
+ config.setString(FsStateBackendFactory.CHECKPOINT_DIRECTORY_URI_CONF_KEY, checkpointDir.toURI().toString());
+ config.setString(FsStateBackendFactory.MEMORY_THRESHOLD_CONF_KEY, "0");
+ config.setString("state.savepoints.dir", savepointDir.toURI().toString());
+
+ cluster = TestBaseUtils.startCluster(config, false);
+ }
+
+ @After
+ public void teardown() throws Exception {
+ stopCluster(cluster, TestBaseUtils.DEFAULT_TIMEOUT);
+ }
+
+ protected void executeAndSavepoint(
+ StreamExecutionEnvironment env,
+ String savepointPath,
+ Tuple2<String, Integer>... expectedAccumulators) throws Exception {
+
+ // Retrieve the job manager
+ ActorGateway jobManager = Await.result(cluster.leaderGateway().future(), DEADLINE.timeLeft());
+
+ // Submit the job
+ JobGraph jobGraph = env.getStreamGraph().getJobGraph();
+
+
+ JobSubmissionResult jobSubmissionResult = cluster.submitJobDetached(jobGraph);
+
+ LOG.info("Submitted job {} and waiting...", jobSubmissionResult.getJobID());
+
+ StandaloneClusterClient clusterClient = new StandaloneClusterClient(cluster.configuration());
+
+ boolean done = false;
+ while (DEADLINE.hasTimeLeft()) {
+ Thread.sleep(100);
+ Map<String, Object> accumulators = clusterClient.getAccumulators(jobSubmissionResult.getJobID());
+
+ boolean allDone = true;
+ for (Tuple2<String, Integer> acc : expectedAccumulators) {
+ Integer numFinished = (Integer) accumulators.get(acc.f0);
+ if (numFinished == null) {
+ allDone = false;
+ break;
+ }
+ if (!numFinished.equals(acc.f1)) {
+ allDone = false;
+ break;
+ }
+ }
+ if (allDone) {
+ done = true;
+ break;
+ }
+ }
+
+ if (!done) {
+ fail("Did not see the expected accumulator results within time limit.");
+ }
+
+ LOG.info("Triggering savepoint.");
+ // Flink 1.2
+ final Future<Object> savepointResultFuture =
+ jobManager.ask(new JobManagerMessages.TriggerSavepoint(jobSubmissionResult.getJobID(), Option.<String>empty()), DEADLINE.timeLeft());
+
+ // Flink 1.1
+// final Future<Object> savepointResultFuture =
+// jobManager.ask(new JobManagerMessages.TriggerSavepoint(jobSubmissionResult.getJobID()), DEADLINE.timeLeft());
+
+
+ Object savepointResult = Await.result(savepointResultFuture, DEADLINE.timeLeft());
+
+ if (savepointResult instanceof JobManagerMessages.TriggerSavepointFailure) {
+ fail("Error drawing savepoint: " + ((JobManagerMessages.TriggerSavepointFailure) savepointResult).cause());
+ }
+
+ // jobmanager will store savepoint in heap, we have to retrieve it
+ final String jobmanagerSavepointPath = ((JobManagerMessages.TriggerSavepointSuccess) savepointResult).savepointPath();
+ LOG.info("Saved savepoint: " + jobmanagerSavepointPath);
+
+ // Flink 1.2
+ FileUtils.moveFile(new File(new URI(jobmanagerSavepointPath).getPath()), new File(savepointPath));
+
+ // Flink 1.1
+ // Retrieve the savepoint from the testing job manager
+// LOG.info("Requesting the savepoint.");
+// Future<Object> savepointFuture = jobManager.ask(new TestingJobManagerMessages.RequestSavepoint(jobmanagerSavepointPath), DEADLINE.timeLeft());
+//
+// Savepoint savepoint = ((TestingJobManagerMessages.ResponseSavepoint) Await.result(savepointFuture, DEADLINE.timeLeft())).savepoint();
+// LOG.info("Retrieved savepoint: " + jobmanagerSavepointPath + ".");
+//
+// LOG.info("Storing savepoint to file.");
+// Configuration config = new Configuration();
+// config.setString(org.apache.flink.runtime.checkpoint.savepoint.SavepointStoreFactory.SAVEPOINT_BACKEND_KEY, "filesystem");
+// config.setString(org.apache.flink.runtime.checkpoint.savepoint.SavepointStoreFactory.SAVEPOINT_DIRECTORY_KEY, "file:///Users/aljoscha/Downloads");
+// String path = org.apache.flink.runtime.checkpoint.savepoint.SavepointStoreFactory.createFromConfig(config).storeSavepoint(savepoint);
+//
+// FileUtils.moveFile(new File(new URI(path).getPath()), new File(savepointPath));
+ }
+
+ protected void restoreAndExecute(
+ StreamExecutionEnvironment env,
+ String savepointPath,
+ Tuple2<String, Integer>... expectedAccumulators) throws Exception {
+
+ int parallelism = env.getParallelism();
+
+ // Retrieve the job manager
+
+ ActorGateway jobManager = Await.result(cluster.leaderGateway().future(), DEADLINE.timeLeft());
+
+ // Submit the job
+ JobGraph jobGraph = env.getStreamGraph().getJobGraph();
+
+ jobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(savepointPath));
+
+ JobSubmissionResult jobSubmissionResult = cluster.submitJobDetached(jobGraph);
+
+ StandaloneClusterClient clusterClient = new StandaloneClusterClient(cluster.configuration());
+
+ boolean done = false;
+ while (DEADLINE.hasTimeLeft()) {
+ Thread.sleep(100);
+ Map<String, Object> accumulators = clusterClient.getAccumulators(jobSubmissionResult.getJobID());
+
+ boolean allDone = true;
+ for (Tuple2<String, Integer> acc : expectedAccumulators) {
+ Integer numFinished = (Integer) accumulators.get(acc.f0);
+ if (numFinished == null) {
+ System.out.println("NO ACC FOR " + acc);
+ allDone = false;
+ break;
+ }
+ if (!numFinished.equals(acc.f1)) {
+ System.out.println("TO LOW FOR ACC" + acc);
+ allDone = false;
+ break;
+ }
+ }
+ System.out.println("ACC: " + accumulators);
+ if (allDone) {
+ done = true;
+ break;
+ }
+ }
+
+ if (!done) {
+ fail("Did not see the expected accumulator results within time limit.");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/2cbd9f5d/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointUtil.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointUtil.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointUtil.java
deleted file mode 100644
index 85e21c5..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointUtil.java
+++ /dev/null
@@ -1,341 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.checkpointing.utils;
-
-import java.io.File;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.concurrent.TimeUnit;
-import org.apache.commons.io.FileUtils;
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.functions.RichFunction;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.checkpoint.savepoint.Savepoint;
-import org.apache.flink.runtime.instance.ActorGateway;
-import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
-import org.apache.flink.runtime.messages.JobManagerMessages;
-import org.apache.flink.runtime.state.filesystem.FsStateBackendFactory;
-import org.apache.flink.runtime.testingUtils.TestingCluster;
-import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.Option;
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.duration.Deadline;
-import scala.concurrent.duration.FiniteDuration;
-
-public class SavepointUtil {
-
- // list of JobGraphs to create savepoints for
- private static final ArrayList<Class<? extends SavepointTestJob>> savepointJobs = new ArrayList<>();
- static {
- savepointJobs.add(UserFunctionStateJob.class);
- }
-
- private static final Logger LOG = LoggerFactory.getLogger(SavepointUtil.class);
- private static final Deadline DEADLINE = new FiniteDuration(5, TimeUnit.MINUTES).fromNow();
- private static final String SAVEPOINT_BASE_DIR = "./flink-tests/src/test/resources/savepoints/";
-
- private static final int STATE_WAIT_FOR_JOB = 0;
- private static final int STATE_REQUEST_SAVEPOINT = 1;
- private static final int STATE_SAVEPOINT_DONE = 2;
- private static final int STATE_WAIT_FOR_TEST_JOB = 3;
- private static final int STATE_TEST_RESULT = 4;
- private static final int STATE_END = 5;
-
- private static volatile int state = STATE_WAIT_FOR_JOB;
-
- private static TestingCluster flink = null;
- private static ActorGateway jobManager = null;
- private static JobID jobId = null;
- private static File savepointDir = null;
- private static Exception testResult = null;
-
- public static void main(String[] args) throws Exception {
-
- // clean up
-// FileUtils.deleteDirectory(new File(SAVEPOINT_BASE_DIR));
-
- for (Class<? extends SavepointTestJob> testJob : savepointJobs) {
- SavepointTestJob job = testJob.newInstance();
-
-// runJobAndCreateSavepoint(job);
-
- runJobAndCompareState(job);
-
- triggerEndOfTest();
- }
- }
-
- public static synchronized void triggerSavepoint() {
- SavepointUtil.state = SavepointUtil.STATE_REQUEST_SAVEPOINT;
- }
-
- public static synchronized boolean allowStateChange() {
- return SavepointUtil.state < SavepointUtil.STATE_REQUEST_SAVEPOINT;
- }
-
- public static synchronized void triggerOrTestSavepoint(RichFunction function, Object expected, Object actual) throws Exception {
- if (SavepointUtil.state == SavepointUtil.STATE_WAIT_FOR_TEST_JOB) {
- if (expected.equals(actual)) {
- LOG.info("Test was successful.");
- SavepointUtil.testResult = null;
- SavepointUtil.state = SavepointUtil.STATE_TEST_RESULT;
- } else {
- LOG.info("Test failed.");
- SavepointUtil.testResult = new Exception("Comparison of state failed. Expected: " + expected + " but was: " + actual);
- SavepointUtil.state = SavepointUtil.STATE_TEST_RESULT;
- }
- } else if (SavepointUtil.state == SavepointUtil.STATE_WAIT_FOR_JOB) {
- final StateCondition condition = new StateCondition(function.getClass(), function.getRuntimeContext().getIndexOfThisSubtask());
- if (!testCounters.containsKey(condition)) {
- testCounters.put(condition, 0);
- }
- final Integer counter = testCounters.get(condition);
- testCounters.put(condition, counter + 1);
- // check if all counters are ready
- if (checkIfReadyForSavepoint()) {
- SavepointUtil.state = SavepointUtil.STATE_REQUEST_SAVEPOINT;
- }
- }
- }
-
- public static void triggerEndOfTest() throws Exception {
- LOG.info("Cancelling Flink.");
- if (flink != null) {
- flink.stop();
- }
- SavepointUtil.state = SavepointUtil.STATE_END;
- }
-
- public static void runJobAndCreateSavepoint(SavepointTestJob job) throws Exception {
- LOG.info("Waiting for job.");
- SavepointUtil.state = SavepointUtil.STATE_WAIT_FOR_JOB;
-
- final Thread t = new Thread(new SavepointPerformer());
- t.start();
-
- runJob(job);
-
- while(SavepointUtil.state != SavepointUtil.STATE_SAVEPOINT_DONE && DEADLINE.hasTimeLeft()) {
- Thread.sleep(100);
- }
- }
-
- public static void runJobAndCompareState(SavepointTestJob job) throws Exception {
- LOG.info("Waiting for test job.");
- SavepointUtil.state = SavepointUtil.STATE_WAIT_FOR_TEST_JOB;
-
- runJob(job);
-
- while(SavepointUtil.state != SavepointUtil.STATE_TEST_RESULT && DEADLINE.hasTimeLeft()) {
- Thread.sleep(100);
- }
-
- if (SavepointUtil.state != SavepointUtil.STATE_TEST_RESULT) {
- throw new Exception("No test result available.");
- }
- if (testResult != null) {
- throw testResult;
- }
- }
-
- public static void setTestResult(Exception e) {
- SavepointUtil.testResult = e;
- SavepointUtil.state = SavepointUtil.STATE_TEST_RESULT;
- }
-
- // --------------------------------------------------------------------------------------------
-
- private static void runJob(SavepointTestJob job) throws Exception {
- // Config
- int numTaskManagers = 2;
- int numSlotsPerTaskManager = 2;
- int parallelism = numTaskManagers * numSlotsPerTaskManager;
- String savepointPath = SAVEPOINT_BASE_DIR + job.getClass().getSimpleName();
-
- // Flink configuration
- final Configuration config = new Configuration();
- config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTaskManagers);
- config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlotsPerTaskManager);
-
- final File checkpointDir = File.createTempFile("checkpoints", Long.toString(System.nanoTime()));
- savepointDir = new File(savepointPath);
- savepointDir.mkdirs();
-
- if (!checkpointDir.exists() || !savepointDir.exists()) {
- throw new Exception("Test setup failed: failed to create (temporary) directories.");
- }
-
- LOG.info("Created temporary checkpoint directory: " + checkpointDir + ".");
- LOG.info("Created savepoint directory: " + savepointDir + ".");
-
- config.setString(ConfigConstants.STATE_BACKEND, "filesystem");
- config.setString(FsStateBackendFactory.CHECKPOINT_DIRECTORY_URI_CONF_KEY, checkpointDir.toURI().toString());
- config.setString(FsStateBackendFactory.MEMORY_THRESHOLD_CONF_KEY, "0");
- config.setString("state.savepoints.dir", savepointDir.toURI().toString());
-
- LOG.info("Flink configuration: " + config + ".");
-
- // Start Flink
- flink = new TestingCluster(config);
- flink.start();
-
- // Retrieve the job manager
- jobManager = Await.result(flink.leaderGateway().future(), DEADLINE.timeLeft());
-
- // Submit the job
- final JobGraph jobGraph = job.createJobGraph();
- if (SavepointUtil.state == SavepointUtil.STATE_WAIT_FOR_JOB) {
- savepointCondition = job.getSavepointCondition();
- testCounters.clear();
- } else if (SavepointUtil.state == SavepointUtil.STATE_WAIT_FOR_TEST_JOB) {
- final File[] dir = savepointDir.listFiles();
- if (dir.length == 0) {
- throw new RuntimeException("Savepoint of " + job.getClass().getSimpleName() + " does not exist.");
- }
- jobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(dir[0].getAbsolutePath()));
- }
- jobId = jobGraph.getJobID();
-
- LOG.info("Submitting job " + jobGraph.getJobID() + " and waiting...");
-
- flink.submitJobDetached(jobGraph);
- }
-
- private static final HashMap<StateCondition, Integer> testCounters = new HashMap<>();
- private static SavepointCondition[] savepointCondition = null;
-
- private static boolean checkIfReadyForSavepoint() {
- for (SavepointCondition condition : savepointCondition) {
- final StateCondition stateCondition = new StateCondition(condition.clazz, condition.subtask);
- if (!testCounters.containsKey(stateCondition) || testCounters.get(stateCondition) != condition.invocation) {
- return false;
- }
- }
- return true;
- }
-
- private static void performSavepointAndShutdown() throws Exception {
- LOG.info("Triggering a savepoint.");
-
- // Flink 1.2
- final Future<Object> savepointPathFuture = jobManager.ask(new JobManagerMessages.TriggerSavepoint(jobId, Option.<String>empty()), DEADLINE.timeLeft());
- // Flink 1.1
-// final Future<Object> savepointPathFuture = jobManager.ask(new JobManagerMessages.TriggerSavepoint(jobId), DEADLINE.timeLeft());
-
- final String savepointPath = ((JobManagerMessages.TriggerSavepointSuccess) Await.result(savepointPathFuture, DEADLINE.timeLeft())).savepointPath();
- LOG.info("Saved savepoint: " + savepointPath);
-
- // Retrieve the savepoint from the testing job manager
- LOG.info("Requesting the savepoint.");
- Future<Object> savepointFuture = jobManager.ask(new TestingJobManagerMessages.RequestSavepoint(savepointPath), DEADLINE.timeLeft());
-
- Savepoint savepoint = ((TestingJobManagerMessages.ResponseSavepoint) Await.result(savepointFuture, DEADLINE.timeLeft())).savepoint();
- LOG.info("Retrieved savepoint: " + savepointPath + ".");
-
- LOG.info("Storing savepoint to file.");
-
- // Flink 1.2
- // it might be that the savepoint has already been written to file in Flink 1.2
- // this is just the command how to do it in 1.2
-// org.apache.flink.runtime.checkpoint.savepoint.SavepointStore.storeSavepoint(savepointDir.getAbsolutePath(), savepoint);
- // Flink 1.1
- // this writes it for FLink 1.1
-// Configuration config = new Configuration();
-// config.setString(org.apache.flink.runtime.checkpoint.savepoint.SavepointStoreFactory.SAVEPOINT_BACKEND_KEY, "filesystem");
-// config.setString(org.apache.flink.runtime.checkpoint.savepoint.SavepointStoreFactory.SAVEPOINT_DIRECTORY_KEY, "file://" + savepointDir.getAbsolutePath());
-// org.apache.flink.runtime.checkpoint.savepoint.SavepointStoreFactory.createFromConfig(config).storeSavepoint(savepoint);
-
- LOG.info("Cancelling Flink.");
- flink.stop();
-
- SavepointUtil.state = SavepointUtil.STATE_SAVEPOINT_DONE;
- }
-
- private static class StateCondition {
- private Class<?> clazz;
- private Integer subtask;
-
- StateCondition(Class<?> clazz, Integer subtask) {
- this.clazz = clazz;
- this.subtask = subtask;
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
-
- StateCondition that = (StateCondition) o;
-
- return clazz.equals(that.clazz) && subtask.equals(that.subtask);
- }
-
- @Override
- public int hashCode() {
- int result = clazz.hashCode();
- result = 31 * result + subtask.hashCode();
- return result;
- }
- }
-
- public static class SavepointCondition {
- Class<? extends RichFunction> clazz;
- int subtask;
- int invocation;
-
- SavepointCondition(Class<? extends RichFunction> clazz, int subtask, int invocation) {
- this.clazz = clazz;
- this.subtask = subtask;
- this.invocation = invocation;
- }
- }
-
- public interface SavepointTestJob {
- JobGraph createJobGraph();
-
- SavepointCondition[] getSavepointCondition();
- }
-
- private static class SavepointPerformer implements Runnable {
-
- @Override
- public void run() {
- try {
- while (SavepointUtil.state != SavepointUtil.STATE_END) {
- Thread.sleep(100);
- if (SavepointUtil.state == SavepointUtil.STATE_REQUEST_SAVEPOINT) {
- try {
- performSavepointAndShutdown();
- } catch (Exception e) {
- throw new RuntimeException("Performing savepoint failed.", e);
- }
- }
- }
- } catch (InterruptedException e) {
- // stop execution
- }
- LOG.info("SavepointPerformer Thread finished.");
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/2cbd9f5d/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulUDFSavepointMigrationITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulUDFSavepointMigrationITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulUDFSavepointMigrationITCase.java
new file mode 100644
index 0000000..cc21683
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulUDFSavepointMigrationITCase.java
@@ -0,0 +1,562 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing.utils;
+
+import org.apache.flink.api.common.accumulators.IntCounter;
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.checkpoint.Checkpointed;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedRestoring;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.Collector;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * This verifies that we can restore a complete job from a Flink 1.1 savepoint.
+ *
+ * <p>The test pipeline contains both "Checkpointed" state and keyed user state.
+ */
+public class StatefulUDFSavepointMigrationITCase extends SavepointMigrationTestBase {
+ private static final int NUM_SOURCE_ELEMENTS = 4;
+ private static final String EXPECTED_ELEMENTS_ACCUMULATOR = "NUM_EXPECTED_ELEMENTS";
+ private static final String SUCCESSFUL_CHECK_ACCUMULATOR = "SUCCESSFUL_CHECKS";
+
+ /**
+ * This has to be manually executed to create the savepoint on Flink 1.1.
+ */
+ @Test
+ @Ignore
+ public void testCreateSavepointOnFlink11() throws Exception {
+
+ final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+ // we only test memory state backend yet
+ env.setStateBackend(new MemoryStateBackend());
+ env.enableCheckpointing(500);
+ env.setParallelism(4);
+ env.setMaxParallelism(4);
+
+ // create source
+ env
+ .addSource(new LegacyCheckpointedSource(NUM_SOURCE_ELEMENTS)).setMaxParallelism(1).uid("LegacyCheckpointedSource")
+ .flatMap(new LegacyCheckpointedFlatMap()).startNewChain().uid("LegacyCheckpointedFlatMap")
+ .keyBy(0)
+ .flatMap(new LegacyCheckpointedFlatMapWithKeyedState()).startNewChain().uid("LegacyCheckpointedFlatMapWithKeyedState")
+ .keyBy(0)
+ .flatMap(new KeyedStateSettingFlatMap()).startNewChain().uid("KeyedStateSettingFlatMap")
+ .keyBy(0)
+ .transform(
+ "custom_operator",
+ new TypeHint<Tuple2<Long, Long>>() {}.getTypeInfo(),
+ new CheckpointedUdfOperator(new LegacyCheckpointedFlatMapWithKeyedState())).uid("LegacyCheckpointedOperator")
+ .addSink(new AccumulatorCountingSink<Tuple2<Long, Long>>(EXPECTED_ELEMENTS_ACCUMULATOR));
+
+ executeAndSavepoint(
+ env,
+ "src/test/resources/stateful-udf-migration-itcase-flink1.1-savepoint",
+ new Tuple2<>(EXPECTED_ELEMENTS_ACCUMULATOR, NUM_SOURCE_ELEMENTS));
+ }
+
+ /**
+ * This has to be manually executed to create the savepoint on Flink 1.1.
+ */
+ @Test
+ @Ignore
+ public void testCreateSavepointOnFlink11WithRocksDB() throws Exception {
+
+ final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+ RocksDBStateBackend rocksBackend =
+ new RocksDBStateBackend(new MemoryStateBackend());
+// rocksBackend.enableFullyAsyncSnapshots();
+ env.setStateBackend(rocksBackend);
+ env.enableCheckpointing(500);
+ env.setParallelism(4);
+ env.setMaxParallelism(4);
+
+ // create source
+ env
+ .addSource(new LegacyCheckpointedSource(NUM_SOURCE_ELEMENTS)).setMaxParallelism(1).uid("LegacyCheckpointedSource")
+ .flatMap(new LegacyCheckpointedFlatMap()).startNewChain().uid("LegacyCheckpointedFlatMap")
+ .keyBy(0)
+ .flatMap(new LegacyCheckpointedFlatMapWithKeyedState()).startNewChain().uid("LegacyCheckpointedFlatMapWithKeyedState")
+ .keyBy(0)
+ .flatMap(new KeyedStateSettingFlatMap()).startNewChain().uid("KeyedStateSettingFlatMap")
+ .keyBy(0)
+ .transform(
+ "custom_operator",
+ new TypeHint<Tuple2<Long, Long>>() {}.getTypeInfo(),
+ new CheckpointedUdfOperator(new LegacyCheckpointedFlatMapWithKeyedState())).uid("LegacyCheckpointedOperator")
+ .addSink(new AccumulatorCountingSink<Tuple2<Long, Long>>(EXPECTED_ELEMENTS_ACCUMULATOR));
+
+ executeAndSavepoint(
+ env,
+ "src/test/resources/stateful-udf-migration-itcase-flink1.1-savepoint-rocksdb",
+ new Tuple2<>(EXPECTED_ELEMENTS_ACCUMULATOR, NUM_SOURCE_ELEMENTS));
+ }
+
+
+ @Test
+ public void testSavepointRestoreFromFlink11() throws Exception {
+
+ final int EXPECTED_SUCCESSFUL_CHECKS = 21;
+
+ final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+ // we only test memory state backend yet
+ env.setStateBackend(new MemoryStateBackend());
+ env.enableCheckpointing(500);
+ env.setParallelism(4);
+ env.setMaxParallelism(4);
+
+ // create source
+ env
+ .addSource(new RestoringCheckingSource(NUM_SOURCE_ELEMENTS)).setMaxParallelism(1).uid("LegacyCheckpointedSource")
+ .flatMap(new RestoringCheckingFlatMap()).startNewChain().uid("LegacyCheckpointedFlatMap")
+ .keyBy(0)
+ .flatMap(new RestoringCheckingFlatMapWithKeyedState()).startNewChain().uid("LegacyCheckpointedFlatMapWithKeyedState")
+ .keyBy(0)
+ .flatMap(new KeyedStateCheckingFlatMap()).startNewChain().uid("KeyedStateSettingFlatMap")
+ .keyBy(0)
+ .transform(
+ "custom_operator",
+ new TypeHint<Tuple2<Long, Long>>() {}.getTypeInfo(),
+ new RestoringCheckingUdfOperator(new RestoringCheckingFlatMapWithKeyedState())).uid("LegacyCheckpointedOperator")
+ .addSink(new AccumulatorCountingSink<Tuple2<Long, Long>>(EXPECTED_ELEMENTS_ACCUMULATOR));
+
+ restoreAndExecute(
+ env,
+ getResourceFilename("stateful-udf-migration-itcase-flink1.1-savepoint"),
+ new Tuple2<>(SUCCESSFUL_CHECK_ACCUMULATOR, EXPECTED_SUCCESSFUL_CHECKS));
+ }
+
+ @Test
+ public void testSavepointRestoreFromFlink11FromRocksDB() throws Exception {
+
+ final int EXPECTED_SUCCESSFUL_CHECKS = 21;
+
+ final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+ // we only test memory state backend yet
+ env.setStateBackend(new RocksDBStateBackend(new MemoryStateBackend()));
+ env.enableCheckpointing(500);
+ env.setParallelism(4);
+ env.setMaxParallelism(4);
+
+ // create source
+ env
+ .addSource(new RestoringCheckingSource(NUM_SOURCE_ELEMENTS)).setMaxParallelism(1).uid("LegacyCheckpointedSource")
+ .flatMap(new RestoringCheckingFlatMap()).startNewChain().uid("LegacyCheckpointedFlatMap")
+ .keyBy(0)
+ .flatMap(new RestoringCheckingFlatMapWithKeyedState()).startNewChain().uid("LegacyCheckpointedFlatMapWithKeyedState")
+ .keyBy(0)
+ .flatMap(new KeyedStateCheckingFlatMap()).startNewChain().uid("KeyedStateSettingFlatMap")
+ .keyBy(0)
+ .transform(
+ "custom_operator",
+ new TypeHint<Tuple2<Long, Long>>() {}.getTypeInfo(),
+ new RestoringCheckingUdfOperator(new RestoringCheckingFlatMapWithKeyedState())).uid("LegacyCheckpointedOperator")
+ .addSink(new AccumulatorCountingSink<Tuple2<Long, Long>>(EXPECTED_ELEMENTS_ACCUMULATOR));
+
+ restoreAndExecute(
+ env,
+ getResourceFilename("stateful-udf-migration-itcase-flink1.1-savepoint-rocksdb"),
+ new Tuple2<>(SUCCESSFUL_CHECK_ACCUMULATOR, EXPECTED_SUCCESSFUL_CHECKS));
+ }
+
+ private static class LegacyCheckpointedSource
+ implements SourceFunction<Tuple2<Long, Long>>, Checkpointed<String> {
+
+ public static String CHECKPOINTED_STRING = "Here be dragons!";
+
+ private static final long serialVersionUID = 1L;
+
+ private volatile boolean isRunning = true;
+
+ private final int numElements;
+
+ public LegacyCheckpointedSource(int numElements) {
+ this.numElements = numElements;
+ }
+
+ @Override
+ public void run(SourceContext<Tuple2<Long, Long>> ctx) throws Exception {
+
+ synchronized (ctx.getCheckpointLock()) {
+ for (long i = 0; i < numElements; i++) {
+ ctx.collect(new Tuple2<>(i, i));
+ }
+ }
+ while (isRunning) {
+ Thread.sleep(20);
+ }
+ }
+
+ @Override
+ public void cancel() {
+ isRunning = false;
+ }
+
+ @Override
+ public void restoreState(String state) throws Exception {
+ assertEquals(CHECKPOINTED_STRING, state);
+ }
+
+ @Override
+ public String snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
+ return CHECKPOINTED_STRING;
+ }
+ }
+
+ private static class RestoringCheckingSource
+ extends RichSourceFunction<Tuple2<Long, Long>>
+ implements CheckpointedRestoring<String> {
+
+ private static final long serialVersionUID = 1L;
+
+ private volatile boolean isRunning = true;
+
+ private final int numElements;
+
+ private String restoredState;
+
+ public RestoringCheckingSource(int numElements) {
+ this.numElements = numElements;
+ }
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ super.open(parameters);
+
+ getRuntimeContext().addAccumulator(SUCCESSFUL_CHECK_ACCUMULATOR, new IntCounter());
+ }
+
+ @Override
+ public void run(SourceContext<Tuple2<Long, Long>> ctx) throws Exception {
+ assertEquals(LegacyCheckpointedSource.CHECKPOINTED_STRING, restoredState);
+ getRuntimeContext().getAccumulator(SUCCESSFUL_CHECK_ACCUMULATOR).add(1);
+
+ synchronized (ctx.getCheckpointLock()) {
+ for (long i = 0; i < numElements; i++) {
+ ctx.collect(new Tuple2<>(i, i));
+ }
+ }
+
+ while (isRunning) {
+ Thread.sleep(20);
+ }
+ }
+
+ @Override
+ public void cancel() {
+ isRunning = false;
+ }
+
+ @Override
+ public void restoreState(String state) throws Exception {
+ restoredState = state;
+ }
+ }
+
+ public static class LegacyCheckpointedFlatMap extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>
+ implements Checkpointed<Tuple2<String, Long>> {
+
+ private static final long serialVersionUID = 1L;
+
+ public static Tuple2<String, Long> CHECKPOINTED_TUPLE =
+ new Tuple2<>("hello", 42L);
+
+ @Override
+ public void flatMap(Tuple2<Long, Long> value, Collector<Tuple2<Long, Long>> out) throws Exception {
+ out.collect(value);
+ }
+
+ @Override
+ public void restoreState(Tuple2<String, Long> state) throws Exception {
+ }
+
+ @Override
+ public Tuple2<String, Long> snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
+ return CHECKPOINTED_TUPLE;
+ }
+ }
+
+ public static class RestoringCheckingFlatMap extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>
+ implements CheckpointedRestoring<Tuple2<String, Long>> {
+
+ private static final long serialVersionUID = 1L;
+
+ private transient Tuple2<String, Long> restoredState;
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ super.open(parameters);
+
+ getRuntimeContext().addAccumulator(SUCCESSFUL_CHECK_ACCUMULATOR, new IntCounter());
+ }
+
+ @Override
+ public void flatMap(Tuple2<Long, Long> value, Collector<Tuple2<Long, Long>> out) throws Exception {
+ out.collect(value);
+
+ assertEquals(LegacyCheckpointedFlatMap.CHECKPOINTED_TUPLE, restoredState);
+ getRuntimeContext().getAccumulator(SUCCESSFUL_CHECK_ACCUMULATOR).add(1);
+
+ }
+
+ @Override
+ public void restoreState(Tuple2<String, Long> state) throws Exception {
+ restoredState = state;
+ }
+ }
+
+ public static class LegacyCheckpointedFlatMapWithKeyedState
+ extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>
+ implements Checkpointed<Tuple2<String, Long>> {
+
+ private static final long serialVersionUID = 1L;
+
+ public static Tuple2<String, Long> CHECKPOINTED_TUPLE =
+ new Tuple2<>("hello", 42L);
+
+ private final ValueStateDescriptor<Long> stateDescriptor =
+ new ValueStateDescriptor<Long>("state-name", LongSerializer.INSTANCE, null);
+
+ @Override
+ public void flatMap(Tuple2<Long, Long> value, Collector<Tuple2<Long, Long>> out) throws Exception {
+ out.collect(value);
+
+ getRuntimeContext().getState(stateDescriptor).update(value.f1);
+ }
+
+ @Override
+ public void restoreState(Tuple2<String, Long> state) throws Exception {
+ }
+
+ @Override
+ public Tuple2<String, Long> snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
+ return CHECKPOINTED_TUPLE;
+ }
+ }
+
+ public static class RestoringCheckingFlatMapWithKeyedState extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>
+ implements CheckpointedRestoring<Tuple2<String, Long>> {
+
+ private static final long serialVersionUID = 1L;
+
+ private transient Tuple2<String, Long> restoredState;
+
+ private final ValueStateDescriptor<Long> stateDescriptor =
+ new ValueStateDescriptor<Long>("state-name", LongSerializer.INSTANCE, null);
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ super.open(parameters);
+
+ getRuntimeContext().addAccumulator(SUCCESSFUL_CHECK_ACCUMULATOR, new IntCounter());
+ }
+
+ @Override
+ public void flatMap(Tuple2<Long, Long> value, Collector<Tuple2<Long, Long>> out) throws Exception {
+ out.collect(value);
+
+ ValueState<Long> state = getRuntimeContext().getState(stateDescriptor);
+ if (state == null) {
+ throw new RuntimeException("Missing key value state for " + value);
+ }
+
+ assertEquals(value.f1, state.value());
+ assertEquals(LegacyCheckpointedFlatMap.CHECKPOINTED_TUPLE, restoredState);
+ getRuntimeContext().getAccumulator(SUCCESSFUL_CHECK_ACCUMULATOR).add(1);
+ }
+
+ @Override
+ public void restoreState(Tuple2<String, Long> state) throws Exception {
+ restoredState = state;
+ }
+ }
+
+ public static class KeyedStateSettingFlatMap extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {
+
+ private static final long serialVersionUID = 1L;
+
+ private final ValueStateDescriptor<Long> stateDescriptor =
+ new ValueStateDescriptor<Long>("state-name", LongSerializer.INSTANCE, null);
+
+ @Override
+ public void flatMap(Tuple2<Long, Long> value, Collector<Tuple2<Long, Long>> out) throws Exception {
+ out.collect(value);
+
+ getRuntimeContext().getState(stateDescriptor).update(value.f1);
+ }
+ }
+
+ public static class KeyedStateCheckingFlatMap extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {
+
+ private static final long serialVersionUID = 1L;
+
+ private final ValueStateDescriptor<Long> stateDescriptor =
+ new ValueStateDescriptor<Long>("state-name", LongSerializer.INSTANCE, null);
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ super.open(parameters);
+
+ getRuntimeContext().addAccumulator(SUCCESSFUL_CHECK_ACCUMULATOR, new IntCounter());
+ }
+
+ @Override
+ public void flatMap(Tuple2<Long, Long> value, Collector<Tuple2<Long, Long>> out) throws Exception {
+ out.collect(value);
+
+ ValueState<Long> state = getRuntimeContext().getState(stateDescriptor);
+ if (state == null) {
+ throw new RuntimeException("Missing key value state for " + value);
+ }
+
+ assertEquals(value.f1, state.value());
+ getRuntimeContext().getAccumulator(SUCCESSFUL_CHECK_ACCUMULATOR).add(1);
+ }
+ }
+
+ public static class CheckpointedUdfOperator
+ extends AbstractUdfStreamOperator<Tuple2<Long, Long>, FlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>>
+ implements OneInputStreamOperator<Tuple2<Long, Long>, Tuple2<Long, Long>> {
+ private static final long serialVersionUID = 1L;
+
+ private static final String CHECKPOINTED_STRING = "Oh my, that's nice!";
+
+ public CheckpointedUdfOperator(FlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> userFunction) {
+ super(userFunction);
+ }
+
+ @Override
+ public void processElement(StreamRecord<Tuple2<Long, Long>> element) throws Exception {
+ output.collect(element);
+ }
+
+ @Override
+ public void processWatermark(Watermark mark) throws Exception {
+ output.emitWatermark(mark);
+ }
+
+ // Flink 1.1
+// @Override
+// public StreamTaskState snapshotOperatorState(
+// long checkpointId, long timestamp) throws Exception {
+// StreamTaskState result = super.snapshotOperatorState(checkpointId, timestamp);
+//
+// AbstractStateBackend.CheckpointStateOutputView out = getStateBackend().createCheckpointStateOutputView(
+// checkpointId,
+// timestamp);
+//
+// out.writeUTF(CHECKPOINTED_STRING);
+//
+// result.setOperatorState(out.closeAndGetHandle());
+//
+// return result;
+// }
+ }
+
+ public static class RestoringCheckingUdfOperator
+ extends AbstractUdfStreamOperator<Tuple2<Long, Long>, FlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>>
+ implements OneInputStreamOperator<Tuple2<Long, Long>, Tuple2<Long, Long>> {
+ private static final long serialVersionUID = 1L;
+
+ private String restoredState;
+
+ public RestoringCheckingUdfOperator(FlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> userFunction) {
+ super(userFunction);
+ }
+
+ @Override
+ public void open() throws Exception {
+ super.open();
+ }
+
+ @Override
+ public void processElement(StreamRecord<Tuple2<Long, Long>> element) throws Exception {
+ userFunction.flatMap(element.getValue(), new TimestampedCollector<>(output));
+
+ assertEquals(CheckpointedUdfOperator.CHECKPOINTED_STRING, restoredState);
+ getRuntimeContext().getAccumulator(SUCCESSFUL_CHECK_ACCUMULATOR).add(1);
+ }
+
+ @Override
+ public void processWatermark(Watermark mark) throws Exception {
+ output.emitWatermark(mark);
+ }
+
+ @Override
+ public void restoreState(FSDataInputStream in) throws Exception {
+ super.restoreState(in);
+
+ DataInputViewStreamWrapper streamWrapper = new DataInputViewStreamWrapper(in);
+
+ restoredState = streamWrapper.readUTF();
+ }
+ }
+
+ public static class AccumulatorCountingSink<T> extends RichSinkFunction<T> {
+ private static final long serialVersionUID = 1L;
+
+ private final String accumulatorName;
+
+ int count = 0;
+
+ public AccumulatorCountingSink(String accumulatorName) {
+ this.accumulatorName = accumulatorName;
+ }
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ super.open(parameters);
+
+ getRuntimeContext().addAccumulator(accumulatorName, new IntCounter());
+ }
+
+ @Override
+ public void invoke(T value) throws Exception {
+ count++;
+ getRuntimeContext().getAccumulator(accumulatorName).add(1);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/2cbd9f5d/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/UserFunctionStateJob.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/UserFunctionStateJob.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/UserFunctionStateJob.java
deleted file mode 100644
index 1df7938..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/UserFunctionStateJob.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.checkpointing.utils;
-
-import org.apache.flink.api.common.functions.RichFlatMapFunction;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.state.memory.MemoryStateBackend;
-import org.apache.flink.streaming.api.TimeCharacteristic;
-import org.apache.flink.streaming.api.checkpoint.Checkpointed;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.test.checkpointing.utils.SavepointUtil.SavepointCondition;
-import org.apache.flink.test.checkpointing.utils.SavepointUtil.SavepointTestJob;
-import org.apache.flink.util.Collector;
-
-public class UserFunctionStateJob implements SavepointTestJob {
-
- @Override
- public JobGraph createJobGraph() {
- final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
- // we only test memory state backend yet
- env.setStateBackend(new MemoryStateBackend());
- env.enableCheckpointing(500);
- env.setParallelism(1);
- env.setMaxParallelism(1);
-
- // create source
- final DataStream<Tuple2<Long, Long>> source = env
- .addSource(new SourceFunction<Tuple2<Long, Long>>() {
-
- private volatile boolean isRunning = true;
-
- @Override
- public void run(SourceContext<Tuple2<Long, Long>> ctx) throws Exception {
- while (isRunning) {
- synchronized (ctx.getCheckpointLock()) {
- ctx.collect(new Tuple2<>(1L, 1L));
- }
- }
- }
-
- @Override
- public void cancel() {
- isRunning = false;
- }
- }).uid("CustomSourceFunction");
-
- // non-keyed operator state
- source.flatMap(new SumFlatMapperNonKeyedCheckpointed()).uid("SumFlatMapperNonKeyedCheckpointed").startNewChain().print();
-
- return env.getStreamGraph().getJobGraph();
- }
-
- @Override
- public SavepointCondition[] getSavepointCondition() {
- return new SavepointCondition[] {
- new SavepointCondition(SumFlatMapperNonKeyedCheckpointed.class, 0, 4)
- };
- }
-
- public static class SumFlatMapperNonKeyedCheckpointed extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>
- implements Checkpointed<Tuple2<Long, Long>> {
-
- private transient Tuple2<Long, Long> sum;
-
- @Override
- public void restoreState(Tuple2<Long, Long> state) throws Exception {
- sum = state;
- }
-
- @Override
- public void flatMap(Tuple2<Long, Long> value, Collector<Tuple2<Long, Long>> out) throws Exception {
- if (SavepointUtil.allowStateChange()) {
- if (sum == null) {
- sum = value;
- out.collect(sum);
- } else {
- sum.f1 += value.f1;
- out.collect(sum);
- }
- }
-
- SavepointUtil.triggerOrTestSavepoint(
- this,
- new Tuple2<>(value.f1, value.f1 * 4),
- sum);
- }
-
- @Override
- public Tuple2<Long, Long> snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
- return sum;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/2cbd9f5d/flink-tests/src/test/resources/stateful-udf-migration-itcase-flink1.1-savepoint
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/resources/stateful-udf-migration-itcase-flink1.1-savepoint b/flink-tests/src/test/resources/stateful-udf-migration-itcase-flink1.1-savepoint
new file mode 100644
index 0000000..f2f6dcd
Binary files /dev/null and b/flink-tests/src/test/resources/stateful-udf-migration-itcase-flink1.1-savepoint differ
http://git-wip-us.apache.org/repos/asf/flink/blob/2cbd9f5d/flink-tests/src/test/resources/stateful-udf-migration-itcase-flink1.1-savepoint-rocksdb
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/resources/stateful-udf-migration-itcase-flink1.1-savepoint-rocksdb b/flink-tests/src/test/resources/stateful-udf-migration-itcase-flink1.1-savepoint-rocksdb
new file mode 100644
index 0000000..e63038b
Binary files /dev/null and b/flink-tests/src/test/resources/stateful-udf-migration-itcase-flink1.1-savepoint-rocksdb differ