You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2017/05/03 12:10:22 UTC
[15/50] [abbrv] flink git commit: [Flink-5892] Restore state on
operator level
http://git-wip-us.apache.org/repos/asf/flink/blob/f7980a7e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index 7f24cd3..aa0f08d 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -45,6 +45,7 @@ import org.apache.flink.runtime.jobgraph.JobEdge;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobgraph.ScheduleMode;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
@@ -140,7 +141,9 @@ public class StreamingJobGraphGenerator {
legacyHashes.add(hasher.traverseStreamGraphAndGenerateHashes(streamGraph));
}
- setChaining(hashes, legacyHashes);
+ Map<Integer, List<Tuple2<byte[], byte[]>>> chainedOperatorHashes = new HashMap<>();
+
+ setChaining(hashes, legacyHashes, chainedOperatorHashes);
setPhysicalEdges();
@@ -190,9 +193,9 @@ public class StreamingJobGraphGenerator {
*
* <p>This will recursively create all {@link JobVertex} instances.
*/
- private void setChaining(Map<Integer, byte[]> hashes, List<Map<Integer, byte[]>> legacyHashes) {
+ private void setChaining(Map<Integer, byte[]> hashes, List<Map<Integer, byte[]>> legacyHashes, Map<Integer, List<Tuple2<byte[], byte[]>>> chainedOperatorHashes) {
for (Integer sourceNodeId : streamGraph.getSourceIDs()) {
- createChain(sourceNodeId, sourceNodeId, hashes, legacyHashes, 0);
+ createChain(sourceNodeId, sourceNodeId, hashes, legacyHashes, 0, chainedOperatorHashes);
}
}
@@ -201,7 +204,8 @@ public class StreamingJobGraphGenerator {
Integer currentNodeId,
Map<Integer, byte[]> hashes,
List<Map<Integer, byte[]>> legacyHashes,
- int chainIndex) {
+ int chainIndex,
+ Map<Integer, List<Tuple2<byte[], byte[]>>> chainedOperatorHashes) {
if (!builtVertices.contains(startNodeId)) {
@@ -220,20 +224,27 @@ public class StreamingJobGraphGenerator {
for (StreamEdge chainable : chainableOutputs) {
transitiveOutEdges.addAll(
- createChain(startNodeId, chainable.getTargetId(), hashes, legacyHashes, chainIndex + 1));
+ createChain(startNodeId, chainable.getTargetId(), hashes, legacyHashes, chainIndex + 1, chainedOperatorHashes));
}
for (StreamEdge nonChainable : nonChainableOutputs) {
transitiveOutEdges.add(nonChainable);
- createChain(nonChainable.getTargetId(), nonChainable.getTargetId(), hashes, legacyHashes, 0);
+ createChain(nonChainable.getTargetId(), nonChainable.getTargetId(), hashes, legacyHashes, 0, chainedOperatorHashes);
+ }
+
+ List<Tuple2<byte[], byte[]>> operatorHashes = chainedOperatorHashes.get(startNodeId);
+ if (operatorHashes == null) {
+ operatorHashes = new ArrayList<>();
+ chainedOperatorHashes.put(startNodeId, operatorHashes);
}
+ operatorHashes.add(new Tuple2<>(hashes.get(currentNodeId), legacyHashes.get(1).get(currentNodeId)));
chainedNames.put(currentNodeId, createChainedName(currentNodeId, chainableOutputs));
chainedMinResources.put(currentNodeId, createChainedMinResources(currentNodeId, chainableOutputs));
chainedPreferredResources.put(currentNodeId, createChainedPreferredResources(currentNodeId, chainableOutputs));
StreamConfig config = currentNodeId.equals(startNodeId)
- ? createJobVertex(startNodeId, hashes, legacyHashes)
+ ? createJobVertex(startNodeId, hashes, legacyHashes, chainedOperatorHashes)
: new StreamConfig(new Configuration());
setVertexConfig(currentNodeId, config, chainableOutputs, nonChainableOutputs);
@@ -308,7 +319,8 @@ public class StreamingJobGraphGenerator {
private StreamConfig createJobVertex(
Integer streamNodeId,
Map<Integer, byte[]> hashes,
- List<Map<Integer, byte[]>> legacyHashes) {
+ List<Map<Integer, byte[]>> legacyHashes,
+ Map<Integer, List<Tuple2<byte[], byte[]>>> chainedOperatorHashes) {
JobVertex jobVertex;
StreamNode streamNode = streamGraph.getStreamNode(streamNodeId);
@@ -330,18 +342,32 @@ public class StreamingJobGraphGenerator {
}
}
+ List<Tuple2<byte[], byte[]>> chainedOperators = chainedOperatorHashes.get(streamNodeId);
+ List<OperatorID> chainedOperatorVertexIds = new ArrayList<>();
+ List<OperatorID> userDefinedChainedOperatorVertexIds = new ArrayList<>();
+ if (chainedOperators != null) {
+ for (Tuple2<byte[], byte[]> chainedOperator : chainedOperators) {
+ chainedOperatorVertexIds.add(new OperatorID(chainedOperator.f0));
+ userDefinedChainedOperatorVertexIds.add(chainedOperator.f1 != null ? new OperatorID(chainedOperator.f1) : null);
+ }
+ }
+
if (streamNode.getInputFormat() != null) {
jobVertex = new InputFormatVertex(
chainedNames.get(streamNodeId),
jobVertexId,
- legacyJobVertexIds);
+ legacyJobVertexIds,
+ chainedOperatorVertexIds,
+ userDefinedChainedOperatorVertexIds);
TaskConfig taskConfig = new TaskConfig(jobVertex.getConfiguration());
taskConfig.setStubWrapper(new UserCodeObjectWrapper<Object>(streamNode.getInputFormat()));
} else {
jobVertex = new JobVertex(
chainedNames.get(streamNodeId),
jobVertexId,
- legacyJobVertexIds);
+ legacyJobVertexIds,
+ chainedOperatorVertexIds,
+ userDefinedChainedOperatorVertexIds);
}
jobVertex.setResources(chainedMinResources.get(streamNodeId), chainedPreferredResources.get(streamNodeId));
http://git-wip-us.apache.org/repos/asf/flink/blob/f7980a7e/flink-streaming-java/src/test/java/org/apache/flink/streaming/graph/StreamingJobGraphGeneratorNodeHashTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/graph/StreamingJobGraphGeneratorNodeHashTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/graph/StreamingJobGraphGeneratorNodeHashTest.java
index 05aa694..a897674 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/graph/StreamingJobGraphGeneratorNodeHashTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/graph/StreamingJobGraphGeneratorNodeHashTest.java
@@ -47,7 +47,6 @@ import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
/**
* Tests the {@link StreamNode} hash assignment during translation from {@link StreamGraph} to
@@ -392,10 +391,10 @@ public class StreamingJobGraphGeneratorNodeHashTest extends TestLogger {
}
/**
- * Tests that a manual hash for an intermediate chain node throws an Exception.
+ * Tests that a manual hash for an intermediate chain node is accepted.
*/
- @Test(expected = UnsupportedOperationException.class)
- public void testManualHashAssignmentForIntermediateNodeInChainThrowsException() throws Exception {
+ @Test
+ public void testManualHashAssignmentForIntermediateNodeInChain() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
env.setParallelism(4);
@@ -409,9 +408,6 @@ public class StreamingJobGraphGeneratorNodeHashTest extends TestLogger {
/**
* Tests that a manual hash at the beginning of a chain is accepted.
- *
- * <p>This should work, because the ID is used at the beginning of a chain. This is currently
- * not allowed for intermediate nodes (see {@link #testManualHashAssignmentForIntermediateNodeInChainThrowsException()}).
*/
@Test
public void testManualHashAssignmentForStartNodeInInChain() throws Exception {
@@ -446,7 +442,7 @@ public class StreamingJobGraphGeneratorNodeHashTest extends TestLogger {
}
@Test
- public void testUserProvidedHashingOnChainNotSupported() {
+ public void testUserProvidedHashingOnChainSupported() {
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
env.addSource(new NoOpSourceFunction(), "src").setUidHash("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa")
@@ -455,11 +451,7 @@ public class StreamingJobGraphGeneratorNodeHashTest extends TestLogger {
.keyBy(new NoOpKeySelector())
.reduce(new NoOpReduceFunction()).name("reduce").setUidHash("dddddddddddddddddddddddddddddddd");
- try {
- env.getStreamGraph().getJobGraph();
- fail();
- } catch (UnsupportedOperationException ignored) {
- }
+ env.getStreamGraph().getJobGraph();
}
// ------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/f7980a7e/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
index e0de7d2..72a1b63 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
@@ -32,28 +32,32 @@ import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.checkpoint.SubtaskState;
-import org.apache.flink.runtime.checkpoint.TaskState;
+import org.apache.flink.runtime.checkpoint.OperatorState;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.runtime.checkpoint.savepoint.SavepointV2;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.TaskInformation;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
+import org.apache.flink.runtime.messages.JobManagerMessages;
import org.apache.flink.runtime.messages.JobManagerMessages.CancelJob;
import org.apache.flink.runtime.messages.JobManagerMessages.DisposeSavepoint;
import org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepoint;
import org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepointSuccess;
-import org.apache.flink.runtime.state.ChainedStateHandle;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.filesystem.FileStateHandle;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
@@ -93,7 +97,9 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@@ -102,7 +108,6 @@ import static org.apache.flink.runtime.messages.JobManagerMessages.getDisposeSav
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@@ -310,30 +315,43 @@ public class SavepointITCase extends TestLogger {
};
}};
+ ExecutionGraph graph = (ExecutionGraph) ((JobManagerMessages.JobFound) Await.result(jobManager.ask(new JobManagerMessages.RequestJob(jobId), deadline.timeLeft()), deadline.timeLeft())).executionGraph();
+
// - Verification START -------------------------------------------
String errMsg = "Error during gathering of TaskDeploymentDescriptors";
- assertNull(errMsg, error[0]);
+ if (error[0] != null) {
+ throw new RuntimeException(error[0]);
+ }
+
+ Map<OperatorID, Tuple2<Integer, ExecutionJobVertex>> operatorToJobVertexMapping = new HashMap<>();
+ for (ExecutionJobVertex task : graph.getVerticesTopologically()) {
+ List<OperatorID> operatorIDs = task.getOperatorIDs();
+ for (int x = 0; x < operatorIDs.size(); x++) {
+ operatorToJobVertexMapping.put(operatorIDs.get(x), new Tuple2<>(x, task));
+ }
+ }
// Verify that all tasks, which are part of the savepoint
// have a matching task deployment descriptor.
- for (TaskState taskState : savepoint.getTaskStates()) {
- Collection<TaskDeploymentDescriptor> taskTdds = tdds.get(taskState.getJobVertexID());
+ for (OperatorState operatorState : savepoint.getOperatorStates()) {
+ Tuple2<Integer, ExecutionJobVertex> chainIndexAndJobVertex = operatorToJobVertexMapping.get(operatorState.getOperatorID());
+ Collection<TaskDeploymentDescriptor> taskTdds = tdds.get(chainIndexAndJobVertex.f1.getJobVertexId());
errMsg = "Missing task for savepoint state for operator "
- + taskState.getJobVertexID() + ".";
+ + operatorState.getOperatorID() + ".";
assertTrue(errMsg, taskTdds.size() > 0);
- assertEquals(taskState.getNumberCollectedStates(), taskTdds.size());
+ assertEquals(operatorState.getNumberCollectedStates(), taskTdds.size());
for (TaskDeploymentDescriptor tdd : taskTdds) {
- SubtaskState subtaskState = taskState.getState(tdd.getSubtaskIndex());
+ OperatorSubtaskState subtaskState = operatorState.getState(tdd.getSubtaskIndex());
assertNotNull(subtaskState);
errMsg = "Initial operator state mismatch.";
assertEquals(errMsg, subtaskState.getLegacyOperatorState(),
- tdd.getTaskStateHandles().getLegacyOperatorState());
+ tdd.getTaskStateHandles().getLegacyOperatorState().get(chainIndexAndJobVertex.f0));
}
}
@@ -360,15 +378,13 @@ public class SavepointITCase extends TestLogger {
// The checkpoint files
List<File> checkpointFiles = new ArrayList<>();
- for (TaskState stateForTaskGroup : savepoint.getTaskStates()) {
- for (SubtaskState subtaskState : stateForTaskGroup.getStates()) {
- ChainedStateHandle<StreamStateHandle> streamTaskState = subtaskState.getLegacyOperatorState();
+ for (OperatorState stateForTaskGroup : savepoint.getOperatorStates()) {
+ for (OperatorSubtaskState subtaskState : stateForTaskGroup.getStates()) {
+ StreamStateHandle streamTaskState = subtaskState.getLegacyOperatorState();
- for (int i = 0; i < streamTaskState.getLength(); i++) {
- if (streamTaskState.get(i) != null) {
- FileStateHandle fileStateHandle = (FileStateHandle) streamTaskState.get(i);
- checkpointFiles.add(new File(fileStateHandle.getFilePath().toUri()));
- }
+ if (streamTaskState != null) {
+ FileStateHandle fileStateHandle = (FileStateHandle) streamTaskState;
+ checkpointFiles.add(new File(fileStateHandle.getFilePath().toUri()));
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/f7980a7e/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java
new file mode 100644
index 0000000..2eecf49
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.test.state.operator.restore;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.PoisonPill;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.akka.ListeningBehaviour;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.instance.AkkaActorGateway;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
+import org.apache.flink.runtime.jobmanager.JobManager;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.runtime.leaderretrieval.StandaloneLeaderRetrievalService;
+import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.runtime.taskmanager.TaskManager;
+import org.apache.flink.runtime.testingUtils.TestingJobManager;
+import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
+import org.apache.flink.runtime.testingUtils.TestingMemoryArchivist;
+import org.apache.flink.runtime.testingUtils.TestingTaskManager;
+import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.streaming.api.CheckpointingMode;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import scala.Option;
+import scala.Tuple2;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.io.File;
+import java.net.URL;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Abstract class to verify that it is possible to migrate a 1.2 savepoint to 1.3 and that the topology can be modified
+ * from that point on.
+ *
+ * The verification is done in 2 Steps:
+ * Step 1: Migrate the job to 1.3 by submitting the same job used for the 1.2 savepoint, and create a new savepoint.
+ * Step 2: Modify the job topology, and restore from the savepoint created in step 1.
+ */
+public abstract class AbstractOperatorRestoreTestBase {
+
+ @Rule
+ public final TemporaryFolder tmpFolder = new TemporaryFolder();
+
+ private static ActorSystem actorSystem = null;
+ private static ActorGateway jobManager = null;
+ private static ActorGateway archiver = null;
+ private static ActorGateway taskManager = null;
+
+ private static final FiniteDuration timeout = new FiniteDuration(30, TimeUnit.SECONDS);
+
+ @BeforeClass
+ public static void setupCluster() throws Exception {
+ FiniteDuration timeout = new FiniteDuration(30, TimeUnit.SECONDS);
+
+ actorSystem = AkkaUtils.createLocalActorSystem(new Configuration());
+
+ Tuple2<ActorRef, ActorRef> master = JobManager.startJobManagerActors(
+ new Configuration(),
+ actorSystem,
+ TestingUtils.defaultExecutor(),
+ TestingUtils.defaultExecutor(),
+ Option.apply("jm"),
+ Option.apply("arch"),
+ TestingJobManager.class,
+ TestingMemoryArchivist.class);
+
+ jobManager = new AkkaActorGateway(master._1(), HighAvailabilityServices.DEFAULT_LEADER_ID);
+ archiver = new AkkaActorGateway(master._2(), HighAvailabilityServices.DEFAULT_LEADER_ID);
+
+ Configuration tmConfig = new Configuration();
+ tmConfig.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 4);
+
+ ActorRef taskManagerRef = TaskManager.startTaskManagerComponentsAndActor(
+ tmConfig,
+ ResourceID.generate(),
+ actorSystem,
+ "localhost",
+ Option.apply("tm"),
+ Option.<LeaderRetrievalService>apply(new StandaloneLeaderRetrievalService(jobManager.path(), HighAvailabilityServices.DEFAULT_LEADER_ID)),
+ true,
+ TestingTaskManager.class);
+
+ taskManager = new AkkaActorGateway(taskManagerRef, HighAvailabilityServices.DEFAULT_LEADER_ID);
+
+ // Wait until connected
+ Object msg = new TestingTaskManagerMessages.NotifyWhenRegisteredAtJobManager(jobManager.actor());
+ Await.ready(taskManager.ask(msg, timeout), timeout);
+ }
+
+ @AfterClass
+ public static void tearDownCluster() {
+ if (actorSystem != null) {
+ actorSystem.shutdown();
+ }
+
+ if (archiver != null) {
+ archiver.actor().tell(PoisonPill.getInstance(), ActorRef.noSender());
+ }
+
+ if (jobManager != null) {
+ jobManager.actor().tell(PoisonPill.getInstance(), ActorRef.noSender());
+ }
+
+ if (taskManager != null) {
+ taskManager.actor().tell(PoisonPill.getInstance(), ActorRef.noSender());
+ }
+ }
+
+ @Test
+ public void testMigrationAndRestore() throws Throwable {
+ // submit 1.2 job and create a migrated 1.3 savepoint
+ String savepointPath = migrateJob();
+ // restore from migrated 1.3 savepoint
+ restoreJob(savepointPath);
+ }
+
+ private String migrateJob() throws Throwable {
+ URL savepointResource = AbstractOperatorRestoreTestBase.class.getClassLoader().getResource("operatorstate/" + getMigrationSavepointName());
+ if (savepointResource == null) {
+ throw new IllegalArgumentException("Savepoint file does not exist.");
+ }
+ JobGraph jobToMigrate = createJobGraph(ExecutionMode.MIGRATE);
+ jobToMigrate.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(savepointResource.getFile()));
+
+ Object msg;
+ Object result;
+
+ // Submit job graph
+ msg = new JobManagerMessages.SubmitJob(jobToMigrate, ListeningBehaviour.DETACHED);
+ result = Await.result(jobManager.ask(msg, timeout), timeout);
+
+ if (result instanceof JobManagerMessages.JobResultFailure) {
+ JobManagerMessages.JobResultFailure failure = (JobManagerMessages.JobResultFailure) result;
+ throw new Exception(failure.cause());
+ }
+ Assert.assertSame(JobManagerMessages.JobSubmitSuccess.class, result.getClass());
+
+ // Wait for all tasks to be running
+ msg = new TestingJobManagerMessages.WaitForAllVerticesToBeRunning(jobToMigrate.getJobID());
+ Await.result(jobManager.ask(msg, timeout), timeout);
+
+ // Trigger savepoint
+ File targetDirectory = tmpFolder.newFolder();
+ msg = new JobManagerMessages.CancelJobWithSavepoint(jobToMigrate.getJobID(), targetDirectory.getAbsolutePath());
+ Future<Object> future = jobManager.ask(msg, timeout);
+ result = Await.result(future, timeout);
+
+ if (result instanceof JobManagerMessages.CancellationFailure) {
+ JobManagerMessages.CancellationFailure failure = (JobManagerMessages.CancellationFailure) result;
+ throw new Exception(failure.cause());
+ }
+
+ String savepointPath = ((JobManagerMessages.CancellationSuccess) result).savepointPath();
+
+ // Wait until canceled
+ msg = new TestingJobManagerMessages.NotifyWhenJobStatus(jobToMigrate.getJobID(), JobStatus.CANCELED);
+ Await.ready(jobManager.ask(msg, timeout), timeout);
+
+ return savepointPath;
+ }
+
+ private void restoreJob(String savepointPath) throws Exception {
+ JobGraph jobToRestore = createJobGraph(ExecutionMode.RESTORE);
+ jobToRestore.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(savepointPath, true));
+
+ Object msg;
+ Object result;
+
+ // Submit job graph
+ msg = new JobManagerMessages.SubmitJob(jobToRestore, ListeningBehaviour.DETACHED);
+ result = Await.result(jobManager.ask(msg, timeout), timeout);
+
+ if (result instanceof JobManagerMessages.JobResultFailure) {
+ JobManagerMessages.JobResultFailure failure = (JobManagerMessages.JobResultFailure) result;
+ throw new Exception(failure.cause());
+ }
+ Assert.assertSame(JobManagerMessages.JobSubmitSuccess.class, result.getClass());
+
+ msg = new JobManagerMessages.RequestJobStatus(jobToRestore.getJobID());
+ JobStatus status = ((JobManagerMessages.CurrentJobStatus) Await.result(jobManager.ask(msg, timeout), timeout)).status();
+ while (!status.isTerminalState()) {
+ status = ((JobManagerMessages.CurrentJobStatus) Await.result(jobManager.ask(msg, timeout), timeout)).status();
+ }
+
+ Assert.assertEquals(JobStatus.FINISHED, status);
+ }
+
+ private JobGraph createJobGraph(ExecutionMode mode) {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
+ env.enableCheckpointing(500, CheckpointingMode.EXACTLY_ONCE);
+ env.setRestartStrategy(RestartStrategies.noRestart());
+ env.setStateBackend(new MemoryStateBackend());
+
+ switch (mode) {
+ case MIGRATE:
+ createMigrationJob(env);
+ break;
+ case RESTORE:
+ createRestoredJob(env);
+ break;
+ }
+
+ return StreamingJobGraphGenerator.createJobGraph(env.getStreamGraph());
+ }
+
+ /**
+ * Recreates the job used to create the 1.2 savepoint.
+ *
+ * @param env StreamExecutionEnvironment to use
+ */
+ protected abstract void createMigrationJob(StreamExecutionEnvironment env);
+
+ /**
+ * Creates a modified version of the job used to create the 1.2 savepoint.
+ *
+ * @param env StreamExecutionEnvironment to use
+ */
+ protected abstract void createRestoredJob(StreamExecutionEnvironment env);
+
+ /**
+ * Returns the name of the savepoint directory to use, relative to "resources/operatorstate".
+ *
+ * @return savepoint directory to use
+ */
+ protected abstract String getMigrationSavepointName();
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/f7980a7e/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/ExecutionMode.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/ExecutionMode.java b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/ExecutionMode.java
new file mode 100644
index 0000000..f333aca
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/ExecutionMode.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.test.state.operator.restore;
+
+/**
+ * Enum to control function behavior for the different test stages.
+ *
+ * {@link ExecutionMode#GENERATE} should be used when creating the 1.2 savepoint.
+ * {@link ExecutionMode#MIGRATE} should be used when migrating the 1.2 savepoint to 1.3.
+ * {@link ExecutionMode#RESTORE} should be used when restoring from the migrated 1.3 savepoint.
+ */
+public enum ExecutionMode {
+ GENERATE,
+ MIGRATE,
+ RESTORE
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/f7980a7e/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/keyed/KeyedComplexChainTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/keyed/KeyedComplexChainTest.java b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/keyed/KeyedComplexChainTest.java
new file mode 100644
index 0000000..28cd15a
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/keyed/KeyedComplexChainTest.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.test.state.operator.restore.keyed;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.test.state.operator.restore.AbstractOperatorRestoreTestBase;
+import org.apache.flink.test.state.operator.restore.ExecutionMode;
+
+public class KeyedComplexChainTest extends AbstractOperatorRestoreTestBase {
+
+ @Override
+ public void createMigrationJob(StreamExecutionEnvironment env) {
+ /**
+ * Source -> keyBy -> C(Window -> StatefulMap1 -> StatefulMap2)
+ */
+ SingleOutputStreamOperator<Tuple2<Integer, Integer>> source = KeyedJob.createIntegerTupleSource(env, ExecutionMode.MIGRATE);
+
+ SingleOutputStreamOperator<Integer> window = KeyedJob.createWindowFunction(ExecutionMode.MIGRATE, source);
+
+ SingleOutputStreamOperator<Integer> first = KeyedJob.createFirstStatefulMap(ExecutionMode.MIGRATE, window);
+
+ SingleOutputStreamOperator<Integer> second = KeyedJob.createSecondStatefulMap(ExecutionMode.MIGRATE, first);
+ }
+
+ @Override
+ protected void createRestoredJob(StreamExecutionEnvironment env) {
+ /**
+ * Source -> keyBy -> C(Window -> StatefulMap2) -> StatefulMap1
+ */
+ SingleOutputStreamOperator<Tuple2<Integer, Integer>> source = KeyedJob.createIntegerTupleSource(env, ExecutionMode.RESTORE);
+
+ SingleOutputStreamOperator<Integer> window = KeyedJob.createWindowFunction(ExecutionMode.RESTORE, source);
+
+ SingleOutputStreamOperator<Integer> second = KeyedJob.createSecondStatefulMap(ExecutionMode.RESTORE, window);
+
+ SingleOutputStreamOperator<Integer> first = KeyedJob.createFirstStatefulMap(ExecutionMode.RESTORE, second);
+ first.startNewChain();
+ }
+
+ @Override
+ protected final String getMigrationSavepointName() {
+ return "complexKeyed";
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/f7980a7e/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/keyed/KeyedJob.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/keyed/KeyedJob.java b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/keyed/KeyedJob.java
new file mode 100644
index 0000000..6add7b2
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/keyed/KeyedJob.java
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.test.state.operator.restore.keyed;
+
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.streaming.api.CheckpointingMode;
+import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction;
+import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
+import org.apache.flink.test.state.operator.restore.ExecutionMode;
+import org.apache.flink.util.Collector;
+import org.junit.Assert;
+
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+
+/**
+ * Savepoint generator to create the job used by the {@link KeyedComplexChainTest}.
+ *
+ * The job should be cancelled manually through the REST API using the cancel-with-savepoint operation.
+ */
+public class KeyedJob {
+
+ public static void main(String[] args) throws Exception {
+ ParameterTool pt = ParameterTool.fromArgs(args);
+
+ String savepointsPath = pt.getRequired("savepoint-path");
+
+ Configuration config = new Configuration();
+ config.setString(ConfigConstants.SAVEPOINT_DIRECTORY_KEY, savepointsPath);
+
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(config);
+ env.enableCheckpointing(500, CheckpointingMode.EXACTLY_ONCE);
+ env.setRestartStrategy(RestartStrategies.noRestart());
+
+ env.setStateBackend(new MemoryStateBackend());
+
+ /**
+ * Source -> keyBy -> C(Window -> StatefulMap1 -> StatefulMap2)
+ */
+
+ SingleOutputStreamOperator<Tuple2<Integer, Integer>> source = createIntegerTupleSource(env, ExecutionMode.GENERATE);
+
+ SingleOutputStreamOperator<Integer> window = createWindowFunction(ExecutionMode.GENERATE, source);
+
+ SingleOutputStreamOperator<Integer> first = createFirstStatefulMap(ExecutionMode.GENERATE, window);
+
+ SingleOutputStreamOperator<Integer> second = createSecondStatefulMap(ExecutionMode.GENERATE, first);
+
+ env.execute("job");
+ }
+
+ public static SingleOutputStreamOperator<Tuple2<Integer, Integer>> createIntegerTupleSource(StreamExecutionEnvironment env, ExecutionMode mode) {
+ return env.addSource(new IntegerTupleSource(mode));
+ }
+
+ public static SingleOutputStreamOperator<Integer> createWindowFunction(ExecutionMode mode, DataStream<Tuple2<Integer, Integer>> input) {
+ return input
+ .keyBy(0)
+ .countWindow(1)
+ .apply(new StatefulWindowFunction(mode))
+ .setParallelism(4)
+ .uid("window");
+ }
+
+ public static SingleOutputStreamOperator<Integer> createFirstStatefulMap(ExecutionMode mode, DataStream<Integer> input) {
+ SingleOutputStreamOperator<Integer> map = input
+ .map(new StatefulStringStoringMap(mode, "first"))
+ .setParallelism(4);
+
+ // TODO: re-enable this when generating the actual 1.2 savepoint
+ //if (mode == ExecutionMode.MIGRATE || mode == ExecutionMode.RESTORE) {
+ map.uid("first");
+ //}
+
+ return map;
+ }
+
+ public static SingleOutputStreamOperator<Integer> createSecondStatefulMap(ExecutionMode mode, DataStream<Integer> input) {
+ SingleOutputStreamOperator<Integer> map = input
+ .map(new StatefulStringStoringMap(mode, "second"))
+ .setParallelism(4);
+
+ // TODO: re-enable this when generating the actual 1.2 savepoint
+ //if (mode == ExecutionMode.MIGRATE || mode == ExecutionMode.RESTORE) {
+ map.uid("second");
+ //}
+
+ return map;
+ }
+
+ private static final class IntegerTupleSource extends RichSourceFunction<Tuple2<Integer, Integer>> {
+
+ private static final long serialVersionUID = 1912878510707871659L;
+ private final ExecutionMode mode;
+
+ private boolean running = true;
+
+ private IntegerTupleSource(ExecutionMode mode) {
+ this.mode = mode;
+ }
+
+ @Override
+ public void run(SourceContext<Tuple2<Integer, Integer>> ctx) throws Exception {
+ for (int x = 0; x < 10; x++) {
+ ctx.collect(new Tuple2<>(x, x));
+ }
+
+ switch (mode) {
+ case GENERATE:
+ case MIGRATE:
+ synchronized (this) {
+ while (running) {
+ this.wait();
+ }
+ }
+ }
+ }
+
+ @Override
+ public void cancel() {
+ synchronized (this) {
+ running = false;
+ this.notifyAll();
+ }
+ }
+ }
+
+ private static final class StatefulWindowFunction extends RichWindowFunction<Tuple2<Integer, Integer>, Integer, Tuple, GlobalWindow> {
+
+ private static final long serialVersionUID = -7236313076792964055L;
+
+ private final ExecutionMode mode;
+ private transient ListState<Integer> state;
+
+ private boolean applyCalled = false;
+
+ private StatefulWindowFunction(ExecutionMode mode) {
+ this.mode = mode;
+ }
+
+ @Override
+ public void open(Configuration config) {
+ this.state = getRuntimeContext().getListState(new ListStateDescriptor<>("values", Integer.class));
+ }
+
+ @Override
+ public void apply(Tuple key, GlobalWindow window, Iterable<Tuple2<Integer, Integer>> values, Collector<Integer> out) throws Exception {
+ // fail-safe to make sure apply is actually called
+ applyCalled = true;
+ switch (mode) {
+ case GENERATE:
+ for (Tuple2<Integer, Integer> value : values) {
+ state.add(value.f1);
+ }
+ break;
+ case MIGRATE:
+ case RESTORE:
+ Iterator<Tuple2<Integer, Integer>> input = values.iterator();
+ Iterator<Integer> restored = state.get().iterator();
+ while (input.hasNext() && restored.hasNext()) {
+ Tuple2<Integer, Integer> value = input.next();
+ Integer rValue = restored.next();
+ Assert.assertEquals(rValue, value.f1);
+ }
+ Assert.assertEquals(restored.hasNext(), input.hasNext());
+ }
+ }
+
+ @Override
+ public void close() {
+ Assert.assertTrue("Apply was never called.", applyCalled);
+ }
+ }
+
+ private static class StatefulStringStoringMap extends RichMapFunction<Integer, Integer> implements ListCheckpointed<String> {
+
+ private static final long serialVersionUID = 6092985758425330235L;
+ private final ExecutionMode mode;
+ private final String valueToStore;
+
+ private StatefulStringStoringMap(ExecutionMode mode, String valueToStore) {
+ this.mode = mode;
+ this.valueToStore = valueToStore;
+ }
+
+ @Override
+ public Integer map(Integer value) throws Exception {
+ return value;
+ }
+
+ @Override
+ public List<String> snapshotState(long checkpointId, long timestamp) throws Exception {
+ return Arrays.asList(valueToStore + getRuntimeContext().getIndexOfThisSubtask());
+ }
+
+ @Override
+ public void restoreState(List<String> state) throws Exception {
+ switch (mode) {
+ case GENERATE:
+ break;
+ case MIGRATE:
+ case RESTORE:
+ Assert.assertEquals("Failed for " + valueToStore + getRuntimeContext().getIndexOfThisSubtask(), 1, state.size());
+ String value = state.get(0);
+ Assert.assertEquals(valueToStore + getRuntimeContext().getIndexOfThisSubtask(), value);
+ }
+ }
+ }
+
+
+ private KeyedJob() {
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/f7980a7e/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/AbstractNonKeyedOperatorRestoreTestBase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/AbstractNonKeyedOperatorRestoreTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/AbstractNonKeyedOperatorRestoreTestBase.java
new file mode 100644
index 0000000..5b51765
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/AbstractNonKeyedOperatorRestoreTestBase.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.test.state.operator.restore.unkeyed;
+
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.test.state.operator.restore.AbstractOperatorRestoreTestBase;
+import org.apache.flink.test.state.operator.restore.ExecutionMode;
+
+import static org.apache.flink.test.state.operator.restore.unkeyed.NonKeyedJob.createFirstStatefulMap;
+import static org.apache.flink.test.state.operator.restore.unkeyed.NonKeyedJob.createSecondStatefulMap;
+import static org.apache.flink.test.state.operator.restore.unkeyed.NonKeyedJob.createSource;
+import static org.apache.flink.test.state.operator.restore.unkeyed.NonKeyedJob.createStatelessMap;
+import static org.apache.flink.test.state.operator.restore.unkeyed.NonKeyedJob.createThirdStatefulMap;
+
+/**
+ * All classes extending this class will use the same savepoint and migration job.
+ */
+public abstract class AbstractNonKeyedOperatorRestoreTestBase extends AbstractOperatorRestoreTestBase {
+
+ @Override
+ public void createMigrationJob(StreamExecutionEnvironment env) {
+ /**
+ * Source -> StatefulMap1 -> CHAIN(StatefulMap2 -> Map -> StatefulMap3)
+ */
+ DataStream<Integer> source = createSource(env, ExecutionMode.MIGRATE);
+
+ SingleOutputStreamOperator<Integer> first = createFirstStatefulMap(ExecutionMode.MIGRATE, source);
+ first.startNewChain();
+
+ SingleOutputStreamOperator<Integer> second = createSecondStatefulMap(ExecutionMode.MIGRATE, first);
+ second.startNewChain();
+
+ SingleOutputStreamOperator<Integer> stateless = createStatelessMap(second);
+
+ SingleOutputStreamOperator<Integer> third = createThirdStatefulMap(ExecutionMode.MIGRATE, stateless);
+ }
+
+ @Override
+ protected final String getMigrationSavepointName() {
+ return "nonKeyed";
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/f7980a7e/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainBreakTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainBreakTest.java b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainBreakTest.java
new file mode 100644
index 0000000..6838070
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainBreakTest.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.test.state.operator.restore.unkeyed;
+
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.test.state.operator.restore.ExecutionMode;
+
+import static org.apache.flink.test.state.operator.restore.unkeyed.NonKeyedJob.createFirstStatefulMap;
+import static org.apache.flink.test.state.operator.restore.unkeyed.NonKeyedJob.createSecondStatefulMap;
+import static org.apache.flink.test.state.operator.restore.unkeyed.NonKeyedJob.createSource;
+import static org.apache.flink.test.state.operator.restore.unkeyed.NonKeyedJob.createStatelessMap;
+import static org.apache.flink.test.state.operator.restore.unkeyed.NonKeyedJob.createThirdStatefulMap;
+
+/**
+ * Verifies that the state of all operators is restored if a topology change breaks up a chain.
+ */
+public class ChainBreakTest extends AbstractNonKeyedOperatorRestoreTestBase {
+
+ @Override
+ public void createRestoredJob(StreamExecutionEnvironment env) {
+ /**
+ * Original job: Source -> StatefulMap1 -> CHAIN(StatefulMap2 -> Map -> StatefulMap3)
+ * Modified job: Source -> StatefulMap1 -> CHAIN(StatefulMap2 -> Map) -> StatefulMap3
+ */
+ DataStream<Integer> source = createSource(env, ExecutionMode.RESTORE);
+
+ SingleOutputStreamOperator<Integer> first = createFirstStatefulMap(ExecutionMode.RESTORE, source);
+ first.startNewChain();
+
+ SingleOutputStreamOperator<Integer> second = createSecondStatefulMap(ExecutionMode.RESTORE, first);
+ second.startNewChain();
+
+ SingleOutputStreamOperator<Integer> stateless = createStatelessMap(second);
+
+ SingleOutputStreamOperator<Integer> third = createThirdStatefulMap(ExecutionMode.RESTORE, stateless);
+ third.startNewChain();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/f7980a7e/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainLengthDecreaseTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainLengthDecreaseTest.java b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainLengthDecreaseTest.java
new file mode 100644
index 0000000..e405e76
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainLengthDecreaseTest.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.test.state.operator.restore.unkeyed;
+
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.test.state.operator.restore.ExecutionMode;
+
+import static org.apache.flink.test.state.operator.restore.unkeyed.NonKeyedJob.createFirstStatefulMap;
+import static org.apache.flink.test.state.operator.restore.unkeyed.NonKeyedJob.createSource;
+import static org.apache.flink.test.state.operator.restore.unkeyed.NonKeyedJob.createStatelessMap;
+import static org.apache.flink.test.state.operator.restore.unkeyed.NonKeyedJob.createThirdStatefulMap;
+
+/**
+ * Verifies that the state of all operators is restored if a topology change removes an operator from a chain.
+ */
+public class ChainLengthDecreaseTest extends AbstractNonKeyedOperatorRestoreTestBase {
+
+ @Override
+ public void createRestoredJob(StreamExecutionEnvironment env) {
+ /**
+ * Original job: Source -> StatefulMap1 -> CHAIN(StatefulMap2 -> Map -> StatefulMap3)
+ * Modified job: Source -> StatefulMap1 -> CHAIN(Map -> StatefulMap3)
+ */
+ DataStream<Integer> source = createSource(env, ExecutionMode.RESTORE);
+
+ SingleOutputStreamOperator<Integer> first = createFirstStatefulMap(ExecutionMode.RESTORE, source);
+ first.startNewChain();
+
+ SingleOutputStreamOperator<Integer> stateless = createStatelessMap(first);
+ stateless.startNewChain();
+
+ SingleOutputStreamOperator<Integer> third = createThirdStatefulMap(ExecutionMode.RESTORE, stateless);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/f7980a7e/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainLengthIncreaseTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainLengthIncreaseTest.java b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainLengthIncreaseTest.java
new file mode 100644
index 0000000..b78aa10
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainLengthIncreaseTest.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.test.state.operator.restore.unkeyed;
+
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.test.state.operator.restore.ExecutionMode;
+
+import static org.apache.flink.test.state.operator.restore.unkeyed.NonKeyedJob.createFirstStatefulMap;
+import static org.apache.flink.test.state.operator.restore.unkeyed.NonKeyedJob.createSecondStatefulMap;
+import static org.apache.flink.test.state.operator.restore.unkeyed.NonKeyedJob.createSource;
+import static org.apache.flink.test.state.operator.restore.unkeyed.NonKeyedJob.createStatelessMap;
+import static org.apache.flink.test.state.operator.restore.unkeyed.NonKeyedJob.createThirdStatefulMap;
+
+/**
+ * Verifies that the state of all operator is restored if a topology change adds an operator to a chain.
+ */
+public class ChainLengthIncreaseTest extends AbstractNonKeyedOperatorRestoreTestBase {
+
+ @Override
+ public void createRestoredJob(StreamExecutionEnvironment env) {
+ /**
+ * Original job: Source -> StatefulMap1 -> CHAIN(StatefulMap2 -> Map -> StatefulMap3)
+ * Modified job: Source -> StatefulMap1 -> CHAIN(StatefulMap2 -> Map -> StatefulMap3 -> StatefulMap4)
+ */
+ DataStream<Integer> source = createSource(env, ExecutionMode.RESTORE);
+
+ SingleOutputStreamOperator<Integer> first = createFirstStatefulMap(ExecutionMode.RESTORE, source);
+ first.startNewChain();
+
+ SingleOutputStreamOperator<Integer> second = createSecondStatefulMap(ExecutionMode.RESTORE, first);
+ second.startNewChain();
+
+ SingleOutputStreamOperator<Integer> stateless = createStatelessMap(second);
+
+ SingleOutputStreamOperator<Integer> stateless2 = createStatelessMap(stateless);
+
+ SingleOutputStreamOperator<Integer> third = createThirdStatefulMap(ExecutionMode.RESTORE, stateless2);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/f7980a7e/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainOrderTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainOrderTest.java b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainOrderTest.java
new file mode 100644
index 0000000..7c68b4e
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainOrderTest.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.test.state.operator.restore.unkeyed;
+
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.test.state.operator.restore.ExecutionMode;
+
+import static org.apache.flink.test.state.operator.restore.unkeyed.NonKeyedJob.createFirstStatefulMap;
+import static org.apache.flink.test.state.operator.restore.unkeyed.NonKeyedJob.createSecondStatefulMap;
+import static org.apache.flink.test.state.operator.restore.unkeyed.NonKeyedJob.createSource;
+import static org.apache.flink.test.state.operator.restore.unkeyed.NonKeyedJob.createStatelessMap;
+import static org.apache.flink.test.state.operator.restore.unkeyed.NonKeyedJob.createThirdStatefulMap;
+
+/**
+ * Verifies that the state of all operators is restored if a topology change causes the ordering of a chain to change.
+ */
+public class ChainOrderTest extends AbstractNonKeyedOperatorRestoreTestBase {
+
+ @Override
+ public void createRestoredJob(StreamExecutionEnvironment env) {
+ /**
+ * Original job: Source -> StatefulMap1 -> CHAIN(StatefulMap2 -> Map -> StatefulMap3)
+ * Modified job: Source -> StatefulMap1 -> CHAIN(StatefulMap3 -> Map -> StatefulMap2)
+ */
+ DataStream<Integer> source = createSource(env, ExecutionMode.RESTORE);
+
+ SingleOutputStreamOperator<Integer> first = createFirstStatefulMap(ExecutionMode.RESTORE, source);
+ first.startNewChain();
+
+ SingleOutputStreamOperator<Integer> third = createThirdStatefulMap(ExecutionMode.RESTORE, first);
+ third.startNewChain();
+
+ SingleOutputStreamOperator<Integer> stateless = createStatelessMap(third);
+
+ SingleOutputStreamOperator<Integer> second = createSecondStatefulMap(ExecutionMode.RESTORE, stateless);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/f7980a7e/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainUnionTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainUnionTest.java b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainUnionTest.java
new file mode 100644
index 0000000..3f2fba4
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainUnionTest.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.test.state.operator.restore.unkeyed;
+
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.test.state.operator.restore.ExecutionMode;
+
+import static org.apache.flink.test.state.operator.restore.unkeyed.NonKeyedJob.createFirstStatefulMap;
+import static org.apache.flink.test.state.operator.restore.unkeyed.NonKeyedJob.createSecondStatefulMap;
+import static org.apache.flink.test.state.operator.restore.unkeyed.NonKeyedJob.createSource;
+import static org.apache.flink.test.state.operator.restore.unkeyed.NonKeyedJob.createStatelessMap;
+import static org.apache.flink.test.state.operator.restore.unkeyed.NonKeyedJob.createThirdStatefulMap;
+
+/**
+ * Verifies that the state of all operators is restored if a topology change creates a new chain.
+ */
+public class ChainUnionTest extends AbstractNonKeyedOperatorRestoreTestBase {
+
+ @Override
+ public void createRestoredJob(StreamExecutionEnvironment env) {
+ /**
+ * Original job: Source -> StatefulMap1 -> CHAIN(StatefulMap2 -> Map -> StatefulMap3)
+ * Modified job: Source -> CHAIN(StatefulMap1 -> StatefulMap2 -> Map -> StatefulMap3)
+ */
+ DataStream<Integer> source = createSource(env, ExecutionMode.RESTORE);
+
+ SingleOutputStreamOperator<Integer> first = createFirstStatefulMap(ExecutionMode.RESTORE, source);
+ first.startNewChain();
+
+ SingleOutputStreamOperator<Integer> second = createSecondStatefulMap(ExecutionMode.RESTORE, first);
+
+ SingleOutputStreamOperator<Integer> stateless = createStatelessMap(second);
+
+ SingleOutputStreamOperator<Integer> third = createThirdStatefulMap(ExecutionMode.RESTORE, stateless);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/f7980a7e/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/NonKeyedJob.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/NonKeyedJob.java b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/NonKeyedJob.java
new file mode 100644
index 0000000..32067b3
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/NonKeyedJob.java
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.test.state.operator.restore.unkeyed;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.streaming.api.CheckpointingMode;
+import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.flink.test.state.operator.restore.ExecutionMode;
+import org.junit.Assert;
+
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Savepoint generator to create the job used by the {@link AbstractNonKeyedOperatorRestoreTestBase}.
+ *
+ * The job should be cancelled manually through the REST API using the cancel-with-savepoint operation.
+ */
+public class NonKeyedJob {
+
+ public static void main(String[] args) throws Exception {
+ ParameterTool pt = ParameterTool.fromArgs(args);
+
+ String savepointsPath = pt.getRequired("savepoint-path");
+
+ Configuration config = new Configuration();
+ config.setString(ConfigConstants.SAVEPOINT_DIRECTORY_KEY, savepointsPath);
+
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(config);
+ env.enableCheckpointing(500, CheckpointingMode.EXACTLY_ONCE);
+ env.setRestartStrategy(RestartStrategies.noRestart());
+
+ env.setStateBackend(new MemoryStateBackend());
+
+ /**
+ * Source -> StatefulMap1 -> CHAIN(StatefulMap2 -> Map -> StatefulMap3)
+ */
+ DataStream<Integer> source = createSource(env, ExecutionMode.GENERATE);
+
+ SingleOutputStreamOperator<Integer> first = createFirstStatefulMap(ExecutionMode.GENERATE, source);
+ first.startNewChain();
+
+ SingleOutputStreamOperator<Integer> second = createSecondStatefulMap(ExecutionMode.GENERATE, first);
+ second.startNewChain();
+
+ SingleOutputStreamOperator<Integer> stateless = createStatelessMap(second);
+
+ SingleOutputStreamOperator<Integer> third = createThirdStatefulMap(ExecutionMode.GENERATE, stateless);
+
+ env.execute("job");
+ }
+
+ public static SingleOutputStreamOperator<Integer> createSource(StreamExecutionEnvironment env, ExecutionMode mode) {
+ return env.addSource(new IntegerSource(mode))
+ .setParallelism(4);
+ }
+
+ public static SingleOutputStreamOperator<Integer> createFirstStatefulMap(ExecutionMode mode, DataStream<Integer> input) {
+ return input
+ .map(new StatefulStringStoringMap(mode, "first"))
+ .setParallelism(4)
+ .uid("first");
+ }
+
+ public static SingleOutputStreamOperator<Integer> createSecondStatefulMap(ExecutionMode mode, DataStream<Integer> input) {
+ return input
+ .map(new StatefulStringStoringMap(mode, "second"))
+ .setParallelism(4)
+ .uid("second");
+ }
+
+ public static SingleOutputStreamOperator<Integer> createThirdStatefulMap(ExecutionMode mode, DataStream<Integer> input) {
+ SingleOutputStreamOperator<Integer> map = input
+ .map(new StatefulStringStoringMap(mode, "third"))
+ .setParallelism(4);
+
+ // we cannot set the uid on a chained operator in 1.2
+ if (mode == ExecutionMode.MIGRATE || mode == ExecutionMode.RESTORE) {
+ map.uid("third");
+ }
+
+ return map;
+ }
+
+ public static SingleOutputStreamOperator<Integer> createStatelessMap(DataStream<Integer> input) {
+ return input.map(new NoOpMapFunction())
+ .setParallelism(4);
+ }
+
+ private static class StatefulStringStoringMap extends RichMapFunction<Integer, Integer> implements ListCheckpointed<String> {
+
+ private static final long serialVersionUID = 6092985758425330235L;
+ private final ExecutionMode mode;
+ private final String valueToStore;
+
+ private StatefulStringStoringMap(ExecutionMode mode, String valueToStore) {
+ this.mode = mode;
+ this.valueToStore = valueToStore;
+ }
+
+ @Override
+ public Integer map(Integer value) throws Exception {
+ return value;
+ }
+
+ @Override
+ public List<String> snapshotState(long checkpointId, long timestamp) throws Exception {
+ return Arrays.asList(valueToStore + getRuntimeContext().getIndexOfThisSubtask());
+ }
+
+ @Override
+ public void restoreState(List<String> state) throws Exception {
+ switch (mode) {
+ case GENERATE:
+ break;
+ case MIGRATE:
+ case RESTORE:
+ Assert.assertEquals("Failed for " + valueToStore + getRuntimeContext().getIndexOfThisSubtask(), 1, state.size());
+ String value = state.get(0);
+ Assert.assertEquals(valueToStore + getRuntimeContext().getIndexOfThisSubtask(), value);
+ }
+ }
+ }
+
+ private static class NoOpMapFunction implements MapFunction<Integer, Integer> {
+
+ private static final long serialVersionUID = 6584823409744624276L;
+
+ @Override
+ public Integer map(Integer value) throws Exception {
+ return value;
+ }
+ }
+
+ private static final class IntegerSource extends RichParallelSourceFunction<Integer> {
+
+ private static final long serialVersionUID = 1912878510707871659L;
+ private final ExecutionMode mode;
+
+ private volatile boolean running = true;
+
+ private IntegerSource(ExecutionMode mode) {
+ this.mode = mode;
+ }
+
+ @Override
+ public void run(SourceContext<Integer> ctx) throws Exception {
+ ctx.collect(1);
+
+ switch (mode) {
+ case GENERATE:
+ case MIGRATE:
+ // keep the job running until cancel-with-savepoint was done
+ synchronized (this) {
+ while (running) {
+ this.wait();
+ }
+ }
+ }
+ }
+
+ @Override
+ public void cancel() {
+ synchronized (this) {
+ running = false;
+ this.notifyAll();
+ }
+ }
+ }
+
+ private NonKeyedJob() {
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/f7980a7e/flink-tests/src/test/resources/operatorstate/complexKeyed/_metadata
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/resources/operatorstate/complexKeyed/_metadata b/flink-tests/src/test/resources/operatorstate/complexKeyed/_metadata
new file mode 100644
index 0000000..9e03876
Binary files /dev/null and b/flink-tests/src/test/resources/operatorstate/complexKeyed/_metadata differ
http://git-wip-us.apache.org/repos/asf/flink/blob/f7980a7e/flink-tests/src/test/resources/operatorstate/nonKeyed/_metadata
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/resources/operatorstate/nonKeyed/_metadata b/flink-tests/src/test/resources/operatorstate/nonKeyed/_metadata
new file mode 100644
index 0000000..8fcd1ea
Binary files /dev/null and b/flink-tests/src/test/resources/operatorstate/nonKeyed/_metadata differ