You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2016/11/02 07:10:23 UTC
[2/2] flink git commit: [FLINK-4445] [checkpointing] Add option to
allow non restored savepoint state
[FLINK-4445] [checkpointing] Add option to allow non restored savepoint state
Allows to skip savepoint state that cannot be mapped to a job vertex when
restoring.
This closes #2713.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/da32af12
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/da32af12
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/da32af12
Branch: refs/heads/release-1.1
Commit: da32af1236c321a55e9b877524b04123624f1d69
Parents: 1f91261
Author: Ufuk Celebi <uc...@apache.org>
Authored: Thu Oct 27 09:49:01 2016 +0200
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Wed Nov 2 08:08:45 2016 +0100
----------------------------------------------------------------------
.../savepoint/SavepointCoordinator.java | 20 ++-
.../runtime/executiongraph/ExecutionGraph.java | 15 +-
.../jobgraph/SavepointRestoreSettings.java | 34 ++---
.../flink/runtime/jobmanager/JobManager.scala | 4 +-
.../SavepointCoordinatorRestoreTest.java | 150 ++++++++++++++++++
.../savepoint/SavepointCoordinatorTest.java | 8 +-
.../runtime/jobmanager/JobManagerTest.java | 153 +++++++++++++++++++
7 files changed, 347 insertions(+), 37 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/da32af12/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointCoordinator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointCoordinator.java
index cd9eb0c..2cb8636 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointCoordinator.java
@@ -181,13 +181,16 @@ public class SavepointCoordinator extends CheckpointCoordinator {
*
* @param tasks Tasks that will possibly be reset
* @param savepointPath The path of the savepoint to rollback to
+ * @param allowNonRestoredState Allow to skip checkpoint state that cannot be mapped
+ * to any job vertex in tasks.
* @throws IllegalStateException If coordinator is shut down
* @throws IllegalStateException If mismatch between program and savepoint state
* @throws Exception If savepoint store failure
*/
public void restoreSavepoint(
Map<JobVertexID, ExecutionJobVertex> tasks,
- String savepointPath) throws Exception {
+ String savepointPath,
+ boolean allowNonRestoredState) throws Exception {
checkNotNull(savepointPath, "Savepoint path");
@@ -208,8 +211,8 @@ public class SavepointCoordinator extends CheckpointCoordinator {
String msg = String.format("Failed to rollback to savepoint %s. " +
"Parallelism mismatch between savepoint state and new program. " +
"Cannot map operator %s with parallelism %d to new program with " +
- "parallelism %d. This indicates that the program has been changed " +
- "in a non-compatible way after the savepoint.",
+ "parallelism %d. You cannot change parallelism of " +
+ "Flink 1.1 programs.",
savepoint,
taskState.getJobVertexID(),
taskState.getParallelism(),
@@ -239,12 +242,15 @@ public class SavepointCoordinator extends CheckpointCoordinator {
currentExecutionAttempt.setInitialState(state, kvStateForTaskMap);
}
+ } else if (allowNonRestoredState) {
+ LOG.info("Ignoring checkpoint state for operator {}.", taskState.getJobVertexID());
} else {
String msg = String.format("Failed to rollback to savepoint %s. " +
- "Cannot map old state for task %s to the new program. " +
- "This indicates that the program has been changed in a " +
- "non-compatible way after the savepoint.", savepointPath,
- taskState.getJobVertexID());
+ "Cannot map savepoint state for operator %s to the new program, " +
+ "because the operator is not available in the new program. If " +
+ "you want to allow this, you can set the --allowNonRestoredState " +
+ "option on the CLI.",
+ savepointPath, taskState.getJobVertexID());
throw new IllegalStateException(msg);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/da32af12/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index ce8e0c9..d3b48df 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -953,7 +953,7 @@ public class ExecutionGraph implements Serializable {
if (!restored && savepointCoordinator != null) {
String savepointPath = savepointCoordinator.getSavepointRestorePath();
if (savepointPath != null) {
- savepointCoordinator.restoreSavepoint(getAllVertices(), savepointPath);
+ savepointCoordinator.restoreSavepoint(getAllVertices(), savepointPath, false);
}
}
}
@@ -990,21 +990,20 @@ public class ExecutionGraph implements Serializable {
* actor.
*
* @param savepointPath The path of the savepoint to rollback to.
+ * @param allowNonRestoredState Allow to skip checkpoint state that cannot be mapped
+ * to any job vertex in tasks.
* @throws IllegalStateException If checkpointing is disabled
* @throws IllegalStateException If checkpoint coordinator is shut down
* @throws Exception If failure during rollback
*/
- public void restoreSavepoint(String savepointPath) throws Exception {
+ public void restoreSavepoint(String savepointPath, boolean allowNonRestoredState) throws Exception {
synchronized (progressLock) {
if (savepointCoordinator != null) {
LOG.info("Restoring savepoint: " + savepointPath + ".");
-
savepointCoordinator.restoreSavepoint(
- getAllVertices(), savepointPath);
- }
- else {
- // Sanity check
- throw new IllegalStateException("Checkpointing disabled.");
+ getAllVertices(), savepointPath, allowNonRestoredState);
+ } else {
+ throw new IllegalStateException("Savepoints disabled.");
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/da32af12/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/SavepointRestoreSettings.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/SavepointRestoreSettings.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/SavepointRestoreSettings.java
index 48d3997..10119bc 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/SavepointRestoreSettings.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/SavepointRestoreSettings.java
@@ -33,26 +33,26 @@ public class SavepointRestoreSettings implements Serializable {
private final static SavepointRestoreSettings NONE = new SavepointRestoreSettings(null, false);
/** By default, be strict when restoring from a savepoint. */
- private final static boolean DEFAULT_IGNORE_UNMAPPED_STATE = false;
+ private final static boolean DEFAULT_ALLOW_NON_RESTORED_STATE = false;
/** Savepoint restore path. */
private final String restorePath;
/**
- * Flag indicating whether the restore should ignore if the savepoint contains
- * state for an operator that is not part of the job.
+ * Flag indicating whether non restored state is allowed if the savepoint
+ * contains state for an operator that is not part of the job.
*/
- private final boolean ignoreUnmappedState;
+ private final boolean allowNonRestoredState;
/**
* Creates the restore settings.
*
* @param restorePath Savepoint restore path.
- * @param ignoreUnmappedState Ignore unmapped state.
+ * @param allowNonRestoredState Ignore unmapped state.
*/
- private SavepointRestoreSettings(String restorePath, boolean ignoreUnmappedState) {
+ private SavepointRestoreSettings(String restorePath, boolean allowNonRestoredState) {
this.restorePath = restorePath;
- this.ignoreUnmappedState = ignoreUnmappedState;
+ this.allowNonRestoredState = allowNonRestoredState;
}
/**
@@ -73,14 +73,14 @@ public class SavepointRestoreSettings implements Serializable {
}
/**
- * Returns whether the restore should ignore whether the savepoint contains
- * state that cannot be mapped to the job.
+ * Returns whether non restored state is allowed if the savepoint contains
+ * state that cannot be mapped back to the job.
*
- * @return <code>true</code> if restore should ignore whether the savepoint contains
- * state that cannot be mapped to the job.
+ * @return <code>true</code> if non restored state is allowed if the savepoint
+ * contains state that cannot be mapped back to the job.
*/
- public boolean ignoreUnmappedState() {
- return ignoreUnmappedState;
+ public boolean allowNonRestoredState() {
+ return allowNonRestoredState;
}
@Override
@@ -88,7 +88,7 @@ public class SavepointRestoreSettings implements Serializable {
if (restoreSavepoint()) {
return "SavepointRestoreSettings.forPath(" +
"restorePath='" + restorePath + '\'' +
- ", ignoreUnmappedState=" + ignoreUnmappedState +
+ ", allowNonRestoredState=" + allowNonRestoredState +
')';
} else {
return "SavepointRestoreSettings.none()";
@@ -102,12 +102,12 @@ public class SavepointRestoreSettings implements Serializable {
}
public static SavepointRestoreSettings forPath(String savepointPath) {
- return forPath(savepointPath, DEFAULT_IGNORE_UNMAPPED_STATE);
+ return forPath(savepointPath, DEFAULT_ALLOW_NON_RESTORED_STATE);
}
- public static SavepointRestoreSettings forPath(String savepointPath, boolean ignoreUnmappedState) {
+ public static SavepointRestoreSettings forPath(String savepointPath, boolean allowNonRestoredState) {
checkNotNull(savepointPath, "Savepoint restore path.");
- return new SavepointRestoreSettings(savepointPath, ignoreUnmappedState);
+ return new SavepointRestoreSettings(savepointPath, allowNonRestoredState);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/da32af12/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index a28c25f..e5b5267 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -1299,7 +1299,9 @@ class JobManager(
val savepointPath = savepointSettings.getRestorePath()
try {
- executionGraph.restoreSavepoint(savepointPath)
+ executionGraph.restoreSavepoint(
+ savepointPath,
+ savepointSettings.allowNonRestoredState())
} catch {
case e: Exception =>
jobInfo.client ! decorateMessage(JobResultFailure(new SerializedThrowable(e)))
http://git-wip-us.apache.org/repos/asf/flink/blob/da32af12/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointCoordinatorRestoreTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointCoordinatorRestoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointCoordinatorRestoreTest.java
new file mode 100644
index 0000000..cd03403
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointCoordinatorRestoreTest.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint.savepoint;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
+import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter;
+import org.apache.flink.runtime.checkpoint.TaskState;
+import org.apache.flink.runtime.checkpoint.stats.DisabledCheckpointStatsTracker;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests concerning the restoring of state from a savepoint to the task executions.
+ */
+public class SavepointCoordinatorRestoreTest {
+
+ /**
+ * Tests that the unmapped state flag is correctly handled.
+ *
+ * The flag should only apply for state that is part of the checkpoint.
+ */
+ @Test
+ public void testRestoreUnmappedCheckpointState() throws Exception {
+ // --- (1) Create tasks to restore checkpoint with ---
+ JobVertexID jobVertexId1 = new JobVertexID();
+ JobVertexID jobVertexId2 = new JobVertexID();
+
+ // 1st JobVertex
+ ExecutionVertex vertex11 = mockExecutionVertex(mockExecution(), jobVertexId1, 0, 3);
+ ExecutionVertex vertex12 = mockExecutionVertex(mockExecution(), jobVertexId1, 1, 3);
+ ExecutionVertex vertex13 = mockExecutionVertex(mockExecution(), jobVertexId1, 2, 3);
+ // 2nd JobVertex
+ ExecutionVertex vertex21 = mockExecutionVertex(mockExecution(), jobVertexId2, 0, 2);
+ ExecutionVertex vertex22 = mockExecutionVertex(mockExecution(), jobVertexId2, 1, 2);
+
+ ExecutionJobVertex jobVertex1 = mockExecutionJobVertex(jobVertexId1, new ExecutionVertex[] { vertex11, vertex12, vertex13 });
+ ExecutionJobVertex jobVertex2 = mockExecutionJobVertex(jobVertexId2, new ExecutionVertex[] { vertex21, vertex22 });
+
+ Map<JobVertexID, ExecutionJobVertex> tasks = new HashMap<>();
+ tasks.put(jobVertexId1, jobVertex1);
+ tasks.put(jobVertexId2, jobVertex2);
+
+ SavepointStore store = new HeapSavepointStore();
+
+ SavepointCoordinator coord = new SavepointCoordinator(
+ new JobID(),
+ Integer.MAX_VALUE,
+ Integer.MAX_VALUE,
+ 0,
+ new ExecutionVertex[] {},
+ new ExecutionVertex[] {},
+ new ExecutionVertex[] {},
+ getClass().getClassLoader(),
+ new StandaloneCheckpointIDCounter(),
+ store,
+ new DisabledCheckpointStatsTracker());
+
+ // --- (2) Checkpoint misses state for a jobVertex (should work) ---
+ Map<JobVertexID, TaskState> checkpointTaskStates = new HashMap<>();
+ checkpointTaskStates.put(jobVertexId1, new TaskState(jobVertexId1, 3));
+
+ CompletedCheckpoint checkpoint = new CompletedCheckpoint(new JobID(), 0, 1, 2, new HashMap<>(checkpointTaskStates));
+
+ Savepoint savepoint = new SavepointV0(checkpoint.getCheckpointID(), checkpointTaskStates.values());
+ String savepointPath = store.storeSavepoint(savepoint);
+
+ coord.restoreSavepoint(tasks, savepointPath, false);
+ coord.restoreSavepoint(tasks, savepointPath, true);
+
+ // --- (3) JobVertex missing for task state that is part of the checkpoint ---
+ JobVertexID newJobVertexID = new JobVertexID();
+
+ // There is no task for this
+ checkpointTaskStates.put(newJobVertexID, new TaskState(newJobVertexID, 1));
+
+ checkpoint = new CompletedCheckpoint(new JobID(), 1, 2, 3, new HashMap<>(checkpointTaskStates));
+ savepoint = new SavepointV0(checkpoint.getCheckpointID(), checkpointTaskStates.values());
+ savepointPath = store.storeSavepoint(savepoint);
+
+ // (i) Ignore unmapped state (should succeed)
+ coord.restoreSavepoint(tasks, savepointPath, true);
+
+ // (ii) Don't ignore unmapped state (should fail)
+ try {
+ coord.restoreSavepoint(tasks, savepointPath, false);
+ fail("Did not throw the expected Exception.");
+ } catch (IllegalStateException ignored) {
+ }
+ }
+
+ // ------------------------------------------------------------------------
+
+ private Execution mockExecution() {
+ return mockExecution(ExecutionState.RUNNING);
+ }
+
+ private Execution mockExecution(ExecutionState state) {
+ Execution mock = Mockito.mock(Execution.class);
+ when(mock.getAttemptId()).thenReturn(new ExecutionAttemptID());
+ when(mock.getState()).thenReturn(state);
+ return mock;
+ }
+
+ private ExecutionVertex mockExecutionVertex(Execution execution, JobVertexID vertexId, int subtask, int parallelism) {
+ ExecutionVertex mock = Mockito.mock(ExecutionVertex.class);
+ when(mock.getJobvertexId()).thenReturn(vertexId);
+ when(mock.getParallelSubtaskIndex()).thenReturn(subtask);
+ when(mock.getCurrentExecutionAttempt()).thenReturn(execution);
+ when(mock.getTotalNumberOfParallelSubtasks()).thenReturn(parallelism);
+ return mock;
+ }
+
+ private ExecutionJobVertex mockExecutionJobVertex(JobVertexID id, ExecutionVertex[] vertices) {
+ ExecutionJobVertex vertex = Mockito.mock(ExecutionJobVertex.class);
+ when(vertex.getParallelism()).thenReturn(vertices.length);
+ when(vertex.getJobVertexId()).thenReturn(id);
+ when(vertex.getTaskVertices()).thenReturn(vertices);
+ return vertex;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/da32af12/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointCoordinatorTest.java
index 6b4f354..b1b384d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointCoordinatorTest.java
@@ -271,7 +271,7 @@ public class SavepointCoordinatorTest extends TestLogger {
assertNotNull(savepointPath);
// Rollback
- coordinator.restoreSavepoint(createExecutionJobVertexMap(jobVertices), savepointPath);
+ coordinator.restoreSavepoint(createExecutionJobVertexMap(jobVertices), savepointPath, false);
// Verify all executions have been reset
for (ExecutionVertex vertex : ackVertices) {
@@ -339,7 +339,7 @@ public class SavepointCoordinatorTest extends TestLogger {
// Rollback
coordinator.restoreSavepoint(
createExecutionJobVertexMap(jobVertices),
- savepointPath);
+ savepointPath, false);
fail("Did not throw expected Exception after rollback with parallelism mismatch.");
}
catch (Exception ignored) {
@@ -385,7 +385,7 @@ public class SavepointCoordinatorTest extends TestLogger {
// Rollback
coordinator.restoreSavepoint(
createExecutionJobVertexMap(jobVertex),
- savepointPath);
+ savepointPath, false);
fail("Did not throw expected Exception after rollback with savepoint store failure.");
}
@@ -416,7 +416,7 @@ public class SavepointCoordinatorTest extends TestLogger {
checkpointIdCounter,
savepointStore);
- coordinator.restoreSavepoint(createExecutionJobVertexMap(), "any");
+ coordinator.restoreSavepoint(createExecutionJobVertexMap(), "any", false);
verify(checkpointIdCounter).setCount(eq(12312312L + 1));
http://git-wip-us.apache.org/repos/asf/flink/blob/da32af12/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
index 5515c00..d60e060 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
@@ -24,6 +24,7 @@ import akka.actor.PoisonPill;
import akka.testkit.JavaTestKit;
import com.typesafe.config.Config;
import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.akka.ListeningBehaviour;
@@ -41,6 +42,7 @@ import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.leaderretrieval.StandaloneLeaderRetrievalService;
@@ -464,4 +466,155 @@ public class JobManagerTest extends TestLogger {
}
}
+ /**
+ * Tests that configured {@link SavepointRestoreSettings} are respected.
+ */
+ @Test
+ public void testSavepointRestoreSettings() throws Exception {
+ FiniteDuration timeout = new FiniteDuration(30, TimeUnit.SECONDS);
+
+ ActorSystem actorSystem = null;
+ ActorGateway jobManager = null;
+ ActorGateway archiver = null;
+ ActorGateway taskManager = null;
+ try {
+ actorSystem = AkkaUtils.createLocalActorSystem(new Configuration());
+
+ Tuple2<ActorRef, ActorRef> master = JobManager.startJobManagerActors(
+ new Configuration(),
+ actorSystem,
+ Option.apply("jm"),
+ Option.apply("arch"),
+ TestingJobManager.class,
+ TestingMemoryArchivist.class);
+
+ jobManager = new AkkaActorGateway(master._1(), null);
+ archiver = new AkkaActorGateway(master._2(), null);
+
+ Configuration tmConfig = new Configuration();
+ tmConfig.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 4);
+
+ ActorRef taskManagerRef = TaskManager.startTaskManagerComponentsAndActor(
+ tmConfig,
+ ResourceID.generate(),
+ actorSystem,
+ "localhost",
+ Option.apply("tm"),
+ Option.<LeaderRetrievalService>apply(new StandaloneLeaderRetrievalService(jobManager.path())),
+ true,
+ TestingTaskManager.class);
+
+ taskManager = new AkkaActorGateway(taskManagerRef, null);
+
+ // Wait until connected
+ Object msg = new TestingTaskManagerMessages.NotifyWhenRegisteredAtJobManager(jobManager.actor());
+ Await.ready(taskManager.ask(msg, timeout), timeout);
+
+ // Create job graph
+ JobVertex sourceVertex = new JobVertex("Source");
+ sourceVertex.setInvokableClass(JobManagerHARecoveryTest.BlockingStatefulInvokable.class);
+ sourceVertex.setParallelism(1);
+
+ JobGraph jobGraph = new JobGraph("TestingJob", sourceVertex);
+
+ JobSnapshottingSettings snapshottingSettings = new JobSnapshottingSettings(
+ Collections.singletonList(sourceVertex.getID()),
+ Collections.singletonList(sourceVertex.getID()),
+ Collections.singletonList(sourceVertex.getID()),
+ Long.MAX_VALUE, // deactivated checkpointing
+ 360000,
+ 0,
+ Integer.MAX_VALUE);
+
+ jobGraph.setSnapshotSettings(snapshottingSettings);
+
+ // Submit job graph
+ msg = new JobManagerMessages.SubmitJob(jobGraph, ListeningBehaviour.DETACHED);
+ Await.result(jobManager.ask(msg, timeout), timeout);
+
+ // Wait for all tasks to be running
+ msg = new TestingJobManagerMessages.WaitForAllVerticesToBeRunning(jobGraph.getJobID());
+ Await.result(jobManager.ask(msg, timeout), timeout);
+
+ // Trigger savepoint
+ msg = new JobManagerMessages.TriggerSavepoint(jobGraph.getJobID());
+ Future<Object> future = jobManager.ask(msg, timeout);
+ Object result = Await.result(future, timeout);
+
+ String savepointPath = ((JobManagerMessages.TriggerSavepointSuccess) result).savepointPath();
+
+ // Cancel because of restarts
+ msg = new TestingJobManagerMessages.NotifyWhenJobRemoved(jobGraph.getJobID());
+ Future<?> removedFuture = jobManager.ask(msg, timeout);
+
+ Future<?> cancelFuture = jobManager.ask(new JobManagerMessages.CancelJob(jobGraph.getJobID()), timeout);
+ Object response = Await.result(cancelFuture, timeout);
+ assertTrue("Unexpected response: " + response, response instanceof JobManagerMessages.CancellationSuccess);
+
+ Await.ready(removedFuture, timeout);
+
+ // Adjust the job (we need a new operator ID)
+ JobVertex newSourceVertex = new JobVertex("Source");
+ newSourceVertex.setInvokableClass(JobManagerHARecoveryTest.BlockingStatefulInvokable.class);
+ newSourceVertex.setParallelism(1);
+
+ JobGraph newJobGraph = new JobGraph("TestingJob", newSourceVertex);
+
+ JobSnapshottingSettings newSnapshottingSettings = new JobSnapshottingSettings(
+ Collections.singletonList(newSourceVertex.getID()),
+ Collections.singletonList(newSourceVertex.getID()),
+ Collections.singletonList(newSourceVertex.getID()),
+ Long.MAX_VALUE, // deactivated checkpointing
+ 360000,
+ 0,
+ Integer.MAX_VALUE);
+
+ newJobGraph.setSnapshotSettings(newSnapshottingSettings);
+
+ SavepointRestoreSettings restoreSettings = SavepointRestoreSettings.forPath(savepointPath, false);
+ newJobGraph.setSavepointRestoreSettings(restoreSettings);
+
+ msg = new JobManagerMessages.SubmitJob(newJobGraph, ListeningBehaviour.DETACHED);
+ response = Await.result(jobManager.ask(msg, timeout), timeout);
+
+ assertTrue("Unexpected response: " + response, response instanceof JobManagerMessages.JobResultFailure);
+
+ JobManagerMessages.JobResultFailure failure = (JobManagerMessages.JobResultFailure) response;
+ Throwable cause = failure.cause().deserializeError(ClassLoader.getSystemClassLoader());
+
+ assertTrue(cause instanceof IllegalStateException);
+ assertTrue(cause.getMessage().contains("allowNonRestoredState"));
+
+ // Wait until removed
+ msg = new TestingJobManagerMessages.NotifyWhenJobRemoved(newJobGraph.getJobID());
+
+ Await.ready(jobManager.ask(msg, timeout), timeout);
+
+ // Resubmit, but allow non restored state now
+ restoreSettings = SavepointRestoreSettings.forPath(savepointPath, true);
+ newJobGraph.setSavepointRestoreSettings(restoreSettings);
+
+ msg = new JobManagerMessages.SubmitJob(newJobGraph, ListeningBehaviour.DETACHED);
+ response = Await.result(jobManager.ask(msg, timeout), timeout);
+
+ assertTrue("Unexpected response: " + response, response instanceof JobManagerMessages.JobSubmitSuccess);
+ } finally {
+ if (actorSystem != null) {
+ actorSystem.shutdown();
+ }
+
+ if (archiver != null) {
+ archiver.actor().tell(PoisonPill.getInstance(), ActorRef.noSender());
+ }
+
+ if (jobManager != null) {
+ jobManager.actor().tell(PoisonPill.getInstance(), ActorRef.noSender());
+ }
+
+ if (taskManager != null) {
+ taskManager.actor().tell(PoisonPill.getInstance(), ActorRef.noSender());
+ }
+ }
+ }
+
}