You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2019/01/24 19:47:21 UTC

[flink] 06/06: [FLINK-11354][tests] Port JobManagerHARecoveryTest to new code base

This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit c8410766515740784aea968662e6dd722b4b2d70
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Fri Jan 18 14:25:24 2019 +0100

    [FLINK-11354][tests] Port JobManagerHARecoveryTest to new code base
    
    - Moved JobManagerHARecoveryTest#testJobRecoveryWhenLosingLeadership to
    DispatcherHATest#testJobRecoveryWhenChangingLeadership
    
    - Moved JobManagerHARecoveryTest#testFailingJobRecovery to
    DispatcherHATest#testFailingRecoveryIsAFatalError
    
    This closes #7539.
---
 .../flink/runtime/dispatcher/Dispatcher.java       |   5 +-
 .../flink/runtime/dispatcher/DispatcherHATest.java | 179 +++++-
 .../jobmanager/JobManagerHARecoveryTest.java       | 605 ---------------------
 3 files changed, 172 insertions(+), 617 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
index 544a438..81b826e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
@@ -834,7 +834,10 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
 				confirmationFuture.whenComplete(
 					(Void ignored, Throwable throwable) -> {
 						if (throwable != null) {
-							onFatalError(ExceptionUtils.stripCompletionException(throwable));
+							onFatalError(
+								new DispatcherException(
+									String.format("Failed to take leadership with session id %s.", newLeaderSessionID),
+									(ExceptionUtils.stripCompletionException(throwable))));
 						}
 					});
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherHATest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherHATest.java
index a6ad3fb..b965e71 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherHATest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherHATest.java
@@ -43,7 +43,10 @@ import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.rpc.RpcUtils;
 import org.apache.flink.runtime.rpc.TestingRpcService;
 import org.apache.flink.runtime.testtasks.NoOpInvokable;
+import org.apache.flink.runtime.testutils.InMemorySubmittedJobGraphStore;
 import org.apache.flink.runtime.util.TestingFatalErrorHandler;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.TestLogger;
 
@@ -64,6 +67,7 @@ import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
+import java.util.function.Supplier;
 
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.is;
@@ -125,7 +129,7 @@ public class DispatcherHATest extends TestLogger {
 
 		final BlockingQueue<DispatcherId> fencingTokens = new ArrayBlockingQueue<>(2);
 
-		final HATestingDispatcher dispatcher = createDispatcher(highAvailabilityServices, fencingTokens);
+		final HATestingDispatcher dispatcher = createDispatcherWithObservableFencingTokens(highAvailabilityServices, fencingTokens);
 
 		dispatcher.start();
 
@@ -162,7 +166,7 @@ public class DispatcherHATest extends TestLogger {
 			.build();
 
 		final ArrayBlockingQueue<DispatcherId> fencingTokens = new ArrayBlockingQueue<>(2);
-		final HATestingDispatcher dispatcher = createDispatcher(
+		final HATestingDispatcher dispatcher = createDispatcherWithObservableFencingTokens(
 			highAvailabilityServices,
 			fencingTokens);
 
@@ -195,20 +199,117 @@ public class DispatcherHATest extends TestLogger {
 		}
 	}
 
+	/**
+	 * Tests that a Dispatcher does not remove the JobGraph from the submitted job graph store
+	 * when losing leadership and recovers it when regaining leadership.
+	 */
+	@Test
+	public void testJobRecoveryWhenChangingLeadership() throws Exception {
+		final InMemorySubmittedJobGraphStore submittedJobGraphStore = new InMemorySubmittedJobGraphStore();
+
+		final CompletableFuture<JobID> recoveredJobFuture = new CompletableFuture<>();
+		submittedJobGraphStore.setRecoverJobGraphFunction((jobID, jobIDSubmittedJobGraphMap) -> {
+			recoveredJobFuture.complete(jobID);
+			return jobIDSubmittedJobGraphMap.get(jobID);
+		});
+
+		final TestingLeaderElectionService leaderElectionService = new TestingLeaderElectionService();
+
+		final TestingHighAvailabilityServices highAvailabilityServices = new TestingHighAvailabilityServicesBuilder()
+			.setSubmittedJobGraphStore(submittedJobGraphStore)
+			.setDispatcherLeaderElectionService(leaderElectionService)
+			.build();
+
+		final ArrayBlockingQueue<DispatcherId> fencingTokens = new ArrayBlockingQueue<>(2);
+		final HATestingDispatcher dispatcher = createDispatcherWithObservableFencingTokens(
+			highAvailabilityServices,
+			fencingTokens);
+
+		dispatcher.start();
+
+		try {
+			// grant leadership and submit a single job
+			final DispatcherId expectedDispatcherId = DispatcherId.generate();
+			leaderElectionService.isLeader(expectedDispatcherId.toUUID()).get();
+
+			assertThat(fencingTokens.take(), is(equalTo(expectedDispatcherId)));
+
+			final DispatcherGateway dispatcherGateway = dispatcher.getSelfGateway(DispatcherGateway.class);
+
+			final JobGraph jobGraph = createNonEmptyJobGraph();
+			final CompletableFuture<Acknowledge> submissionFuture = dispatcherGateway.submitJob(jobGraph, timeout);
+
+			submissionFuture.get();
+
+			final JobID jobId = jobGraph.getJobID();
+			assertThat(submittedJobGraphStore.contains(jobId), is(true));
+
+			// revoke the leadership --> this should stop all running JobManagerRunners
+			leaderElectionService.notLeader();
+
+			assertThat(fencingTokens.take(), is(equalTo(NULL_FENCING_TOKEN)));
+
+			assertThat(submittedJobGraphStore.contains(jobId), is(true));
+
+			assertThat(recoveredJobFuture.isDone(), is(false));
+
+			// re-grant leadership
+			leaderElectionService.isLeader(DispatcherId.generate().toUUID());
+
+			assertThat(recoveredJobFuture.get(), is(equalTo(jobId)));
+		} finally {
+			RpcUtils.terminateRpcEndpoint(dispatcher, timeout);
+		}
+	}
+
+	/**
+	 * Tests that a fatal error is reported if the job recovery fails.
+	 */
+	@Test
+	public void testFailingRecoveryIsAFatalError() throws Exception {
+		final String exceptionMessage = "Job recovery test failure.";
+		final Supplier<Exception> exceptionSupplier = () -> new FlinkException(exceptionMessage);
+		final TestingHighAvailabilityServices haServices = new TestingHighAvailabilityServicesBuilder()
+			.setSubmittedJobGraphStore(new FailingSubmittedJobGraphStore(exceptionSupplier))
+			.build();
+
+		final HATestingDispatcher dispatcher = createDispatcher(haServices);
+		dispatcher.start();
+
+		final Throwable failure = testingFatalErrorHandler.getErrorFuture().get();
+
+		assertThat(ExceptionUtils.findThrowableWithMessage(failure, exceptionMessage).isPresent(), is(true));
+
+		testingFatalErrorHandler.clearError();
+	}
+
 	@Nonnull
-	public static JobGraph createNonEmptyJobGraph() {
-		final JobVertex noOpVertex = new JobVertex("NoOp vertex");
-		noOpVertex.setInvokableClass(NoOpInvokable.class);
-		final JobGraph jobGraph = new JobGraph(noOpVertex);
-		jobGraph.setAllowQueuedScheduling(true);
+	private HATestingDispatcher createDispatcherWithObservableFencingTokens(HighAvailabilityServices highAvailabilityServices, Queue<DispatcherId> fencingTokens) throws Exception {
+		return createDispatcher(highAvailabilityServices, fencingTokens, createTestingJobManagerRunnerFactory());
+	}
 
-		return jobGraph;
+	@Nonnull
+	private TestingJobManagerRunnerFactory createTestingJobManagerRunnerFactory() {
+		return new TestingJobManagerRunnerFactory(new CompletableFuture<>(), new CompletableFuture<>(), CompletableFuture.completedFuture(null));
+	}
+
+	@Nonnull
+	private HATestingDispatcher createDispatcherWithJobManagerRunnerFactory(HighAvailabilityServices highAvailabilityServices, Dispatcher.JobManagerRunnerFactory jobManagerRunnerFactory) throws Exception {
+		return createDispatcher(highAvailabilityServices, null, jobManagerRunnerFactory);
+	}
+
+	private HATestingDispatcher createDispatcher(HighAvailabilityServices haServices) throws Exception {
+		return createDispatcher(
+			haServices,
+			null,
+			createTestingJobManagerRunnerFactory());
 	}
 
 	@Nonnull
 	private HATestingDispatcher createDispatcher(
-			TestingHighAvailabilityServices highAvailabilityServices,
-			@Nonnull Queue<DispatcherId> fencingTokens) throws Exception {
+		HighAvailabilityServices highAvailabilityServices,
+		@Nullable Queue<DispatcherId> fencingTokens,
+		Dispatcher.JobManagerRunnerFactory jobManagerRunnerFactory) throws Exception {
 		final Configuration configuration = new Configuration();
 
 		return new HATestingDispatcher(
@@ -222,11 +323,21 @@ public class DispatcherHATest extends TestLogger {
 			UnregisteredMetricGroups.createUnregisteredJobManagerMetricGroup(),
 			null,
 			new MemoryArchivedExecutionGraphStore(),
-			new TestingJobManagerRunnerFactory(new CompletableFuture<>(), new CompletableFuture<>(), CompletableFuture.completedFuture(null)),
+			jobManagerRunnerFactory,
 			testingFatalErrorHandler,
 			fencingTokens);
 	}
 
+	@Nonnull
+	public static JobGraph createNonEmptyJobGraph() {
+		final JobVertex noOpVertex = new JobVertex("NoOp vertex");
+		noOpVertex.setInvokableClass(NoOpInvokable.class);
+		final JobGraph jobGraph = new JobGraph(noOpVertex);
+		jobGraph.setAllowQueuedScheduling(true);
+
+		return jobGraph;
+	}
+
 	private static class HATestingDispatcher extends TestingDispatcher {
 
 		@Nonnull
@@ -334,4 +445,50 @@ public class DispatcherHATest extends TestLogger {
 			return Collections.singleton(submittedJobGraph.getJobId());
 		}
 	}
+
+	private static class FailingSubmittedJobGraphStore implements SubmittedJobGraphStore {
+		private final JobID jobId = new JobID();
+
+		private final Supplier<Exception> exceptionSupplier;
+
+		private FailingSubmittedJobGraphStore(Supplier<Exception> exceptionSupplier) {
+			this.exceptionSupplier = exceptionSupplier;
+		}
+
+		@Override
+		public void start(SubmittedJobGraphListener jobGraphListener) throws Exception {
+
+		}
+
+		@Override
+		public void stop() throws Exception {
+
+		}
+
+		@Nullable
+		@Override
+		public SubmittedJobGraph recoverJobGraph(JobID jobId) throws Exception {
+			throw exceptionSupplier.get();
+		}
+
+		@Override
+		public void putJobGraph(SubmittedJobGraph jobGraph) throws Exception {
+
+		}
+
+		@Override
+		public void removeJobGraph(JobID jobId) throws Exception {
+
+		}
+
+		@Override
+		public void releaseJobGraph(JobID jobId) throws Exception {
+
+		}
+
+		@Override
+		public Collection<JobID> getJobIds() throws Exception {
+			return Collections.singleton(jobId);
+		}
+	}
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
deleted file mode 100644
index d991983..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
+++ /dev/null
@@ -1,605 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.jobmanager;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.configuration.BlobServerOptions;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.HighAvailabilityOptions;
-import org.apache.flink.configuration.TaskManagerOptions;
-import org.apache.flink.core.fs.FSDataInputStream;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.core.testutils.OneShotLatch;
-import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.akka.ListeningBehaviour;
-import org.apache.flink.runtime.blob.BlobServer;
-import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
-import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
-import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
-import org.apache.flink.runtime.checkpoint.CheckpointOptions;
-import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
-import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy;
-import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
-import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
-import org.apache.flink.runtime.checkpoint.PrioritizedOperatorSubtaskState;
-import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter;
-import org.apache.flink.runtime.checkpoint.StateObjectCollection;
-import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
-import org.apache.flink.runtime.checkpoint.TestingCheckpointRecoveryFactory;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.execution.Environment;
-import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
-import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders;
-import org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy;
-import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory;
-import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
-import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
-import org.apache.flink.runtime.instance.ActorGateway;
-import org.apache.flink.runtime.instance.AkkaActorGateway;
-import org.apache.flink.runtime.instance.InstanceManager;
-import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.jobgraph.JobStatus;
-import org.apache.flink.runtime.jobgraph.JobVertex;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.jobgraph.OperatorID;
-import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
-import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
-import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
-import org.apache.flink.runtime.leaderelection.LeaderElectionService;
-import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
-import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService;
-import org.apache.flink.runtime.messages.JobManagerMessages;
-import org.apache.flink.runtime.metrics.NoOpMetricRegistry;
-import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
-import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
-import org.apache.flink.runtime.state.OperatorStateHandle;
-import org.apache.flink.runtime.state.OperatorStreamStateHandle;
-import org.apache.flink.runtime.state.TaskStateManager;
-import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
-import org.apache.flink.runtime.taskmanager.TaskManager;
-import org.apache.flink.runtime.testingUtils.TestingJobManager;
-import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
-import org.apache.flink.runtime.testingUtils.TestingMessages;
-import org.apache.flink.runtime.testingUtils.TestingTaskManager;
-import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages;
-import org.apache.flink.runtime.testingUtils.TestingUtils;
-import org.apache.flink.runtime.testutils.InMemorySubmittedJobGraphStore;
-import org.apache.flink.runtime.testutils.RecoverableCompletedCheckpointStore;
-import org.apache.flink.util.InstantiationUtil;
-import org.apache.flink.util.Preconditions;
-import org.apache.flink.util.TestLogger;
-
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import akka.actor.Identify;
-import akka.actor.PoisonPill;
-import akka.actor.Props;
-import akka.japi.pf.FI;
-import akka.japi.pf.ReceiveBuilder;
-import akka.pattern.Patterns;
-import akka.testkit.CallingThreadDispatcher;
-import akka.testkit.JavaTestKit;
-import akka.testkit.TestProbe;
-import org.junit.AfterClass;
-import org.junit.Assert;
-import org.junit.BeforeClass;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.Executor;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
-import scala.Int;
-import scala.Option;
-import scala.PartialFunction;
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.duration.Deadline;
-import scala.concurrent.duration.FiniteDuration;
-import scala.runtime.BoxedUnit;
-
-import static org.hamcrest.Matchers.empty;
-import static org.hamcrest.Matchers.is;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-public class JobManagerHARecoveryTest extends TestLogger {
-
-	private static ActorSystem system;
-
-	@Rule
-	public TemporaryFolder temporaryFolder = new TemporaryFolder();
-
-	@BeforeClass
-	public static void setup() {
-		system = AkkaUtils.createLocalActorSystem(new Configuration());
-	}
-
-	@AfterClass
-	public static void teardown() {
-		JavaTestKit.shutdownActorSystem(system);
-	}
-
-	/**
-	 * Tests that the persisted job is not removed from the SubmittedJobGraphStore if the JobManager
-	 * loses its leadership. Furthermore, it tests that the job manager can recover the job from
-	 * the SubmittedJobGraphStore and checkpoint state is recovered as well.
-	 */
-	@Test
-	public void testJobRecoveryWhenLosingLeadership() throws Exception {
-		FiniteDuration timeout = new FiniteDuration(30, TimeUnit.SECONDS);
-		FiniteDuration jobRecoveryTimeout = new FiniteDuration(0, TimeUnit.SECONDS);
-		Deadline deadline = new FiniteDuration(2, TimeUnit.MINUTES).fromNow();
-		Configuration flinkConfiguration = new Configuration();
-		UUID leaderSessionID = UUID.randomUUID();
-		UUID newLeaderSessionID = UUID.randomUUID();
-		int slots = 2;
-		ActorRef archive = null;
-		ActorRef jobManager = null;
-		ActorRef taskManager = null;
-
-		flinkConfiguration.setString(HighAvailabilityOptions.HA_MODE, "zookeeper");
-		flinkConfiguration.setString(HighAvailabilityOptions.HA_STORAGE_PATH, temporaryFolder.newFolder().toString());
-		flinkConfiguration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, slots);
-		flinkConfiguration.setLong(BlobServerOptions.CLEANUP_INTERVAL, 3_600L);
-
-		try {
-			Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext());
-
-			InMemorySubmittedJobGraphStore submittedJobGraphStore = new InMemorySubmittedJobGraphStore();
-			submittedJobGraphStore.start(null);
-			CompletedCheckpointStore checkpointStore = new RecoverableCompletedCheckpointStore();
-			CheckpointIDCounter checkpointCounter = new StandaloneCheckpointIDCounter();
-			CheckpointRecoveryFactory checkpointStateFactory = new TestingCheckpointRecoveryFactory(checkpointStore, checkpointCounter);
-			TestingLeaderElectionService myLeaderElectionService = new TestingLeaderElectionService();
-			SettableLeaderRetrievalService myLeaderRetrievalService = new SettableLeaderRetrievalService(
-				null,
-				null);
-			TestingHighAvailabilityServices testingHighAvailabilityServices = new TestingHighAvailabilityServices();
-
-			testingHighAvailabilityServices.setJobMasterLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID, myLeaderRetrievalService);
-
-			InstanceManager instanceManager = new InstanceManager();
-			instanceManager.addInstanceListener(scheduler);
-
-			archive = system.actorOf(JobManager.getArchiveProps(MemoryArchivist.class, 10, Option.<Path>empty()));
-
-			BlobServer blobServer = new BlobServer(
-				flinkConfiguration,
-				testingHighAvailabilityServices.createBlobStore());
-			blobServer.start();
-			Props jobManagerProps = Props.create(
-				TestingJobManager.class,
-				flinkConfiguration,
-				TestingUtils.defaultExecutor(),
-				TestingUtils.defaultExecutor(),
-				instanceManager,
-				scheduler,
-				blobServer,
-				new BlobLibraryCacheManager(blobServer, FlinkUserCodeClassLoaders.ResolveOrder.CHILD_FIRST, new String[0]),
-				archive,
-				new FixedDelayRestartStrategy.FixedDelayRestartStrategyFactory(Int.MaxValue(), 100),
-				timeout,
-				myLeaderElectionService,
-				submittedJobGraphStore,
-				checkpointStateFactory,
-				jobRecoveryTimeout,
-				UnregisteredMetricGroups.createUnregisteredJobManagerMetricGroup(),
-				Option.<String>empty());
-
-			jobManager = system.actorOf(jobManagerProps);
-			ActorGateway gateway = new AkkaActorGateway(jobManager, leaderSessionID);
-
-			taskManager = TaskManager.startTaskManagerComponentsAndActor(
-				flinkConfiguration,
-				ResourceID.generate(),
-				system,
-				testingHighAvailabilityServices,
-				NoOpMetricRegistry.INSTANCE,
-				"localhost",
-				Option.<String>apply("taskmanager"),
-				true,
-				TestingTaskManager.class);
-
-			ActorGateway tmGateway = new AkkaActorGateway(taskManager, leaderSessionID);
-
-			Future<Object> tmAlive = tmGateway.ask(TestingMessages.getAlive(), deadline.timeLeft());
-
-			Await.ready(tmAlive, deadline.timeLeft());
-
-			JobVertex sourceJobVertex = new JobVertex("Source");
-			sourceJobVertex.setInvokableClass(BlockingStatefulInvokable.class);
-			sourceJobVertex.setParallelism(slots);
-
-			JobGraph jobGraph = new JobGraph("TestingJob", sourceJobVertex);
-
-			List<JobVertexID> vertexId = Collections.singletonList(sourceJobVertex.getID());
-			jobGraph.setSnapshotSettings(new JobCheckpointingSettings(
-					vertexId,
-					vertexId,
-					vertexId,
-					new CheckpointCoordinatorConfiguration(
-						100L,
-						10L * 60L * 1000L,
-						0L,
-						1,
-						CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
-						true),
-					null));
-
-			BlockingStatefulInvokable.initializeStaticHelpers(slots);
-
-			Future<Object> isLeader = gateway.ask(
-					TestingJobManagerMessages.getNotifyWhenLeader(),
-					deadline.timeLeft());
-
-			Future<Object> isConnectedToJobManager = tmGateway.ask(
-					new TestingTaskManagerMessages.NotifyWhenRegisteredAtJobManager(jobManager),
-					deadline.timeLeft());
-
-			// tell jobManager that he's the leader
-			myLeaderElectionService.isLeader(leaderSessionID);
-			// tell taskManager who's the leader
-			myLeaderRetrievalService.notifyListener(gateway.path(), leaderSessionID);
-
-			Await.ready(isLeader, deadline.timeLeft());
-			Await.ready(isConnectedToJobManager, deadline.timeLeft());
-
-			// submit blocking job
-			Future<Object> jobSubmitted = gateway.ask(
-					new JobManagerMessages.SubmitJob(jobGraph, ListeningBehaviour.DETACHED),
-					deadline.timeLeft());
-
-			Await.ready(jobSubmitted, deadline.timeLeft());
-
-			// Wait for some checkpoints to complete
-			BlockingStatefulInvokable.awaitCompletedCheckpoints();
-
-			Future<Object> jobRemoved = gateway.ask(new TestingJobManagerMessages.NotifyWhenJobRemoved(jobGraph.getJobID()), deadline.timeLeft());
-
-			// Revoke leadership
-			myLeaderElectionService.notLeader();
-
-			// check that the job gets removed from the JobManager
-			Await.ready(jobRemoved, deadline.timeLeft());
-			// but stays in the submitted job graph store
-			assertTrue(submittedJobGraphStore.contains(jobGraph.getJobID()));
-
-			Future<Object> jobRunning = gateway.ask(new TestingJobManagerMessages.NotifyWhenJobStatus(jobGraph.getJobID(), JobStatus.RUNNING), deadline.timeLeft());
-
-			// Make JobManager again a leader
-			myLeaderElectionService.isLeader(newLeaderSessionID);
-			// tell the TaskManager about it
-			myLeaderRetrievalService.notifyListener(gateway.path(), newLeaderSessionID);
-
-			// wait that the job is recovered and reaches state RUNNING
-			Await.ready(jobRunning, deadline.timeLeft());
-
-			Future<Object> jobFinished = gateway.ask(new TestingJobManagerMessages.NotifyWhenJobRemoved(jobGraph.getJobID()), deadline.timeLeft());
-
-			BlockingInvokable.unblock();
-
-			// wait til the job has finished
-			Await.ready(jobFinished, deadline.timeLeft());
-
-			// check that the job has been removed from the submitted job graph store
-			assertFalse(submittedJobGraphStore.contains(jobGraph.getJobID()));
-
-			// Check that state has been recovered
-			long[] recoveredStates = BlockingStatefulInvokable.getRecoveredStates();
-			for (long state : recoveredStates) {
-				boolean isExpected = state >= BlockingStatefulInvokable.NUM_CHECKPOINTS_TO_COMPLETE;
-				assertTrue("Did not recover checkpoint state correctly, expecting >= " +
-						BlockingStatefulInvokable.NUM_CHECKPOINTS_TO_COMPLETE +
-						", but state was " + state, isExpected);
-			}
-		} finally {
-			if (archive != null) {
-				archive.tell(PoisonPill.getInstance(), ActorRef.noSender());
-			}
-
-			if (jobManager != null) {
-				jobManager.tell(PoisonPill.getInstance(), ActorRef.noSender());
-			}
-
-			if (taskManager != null) {
-				taskManager.tell(PoisonPill.getInstance(), ActorRef.noSender());
-			}
-		}
-	}
-
-	/**
-	 * Tests that a job recovery failure terminates the {@link JobManager}.
-	 */
-	@Test
-	public void testFailingJobRecovery() throws Exception {
-		final FiniteDuration timeout = new FiniteDuration(10, TimeUnit.SECONDS);
-		final FiniteDuration jobRecoveryTimeout = new FiniteDuration(0, TimeUnit.SECONDS);
-		Deadline deadline = new FiniteDuration(1, TimeUnit.MINUTES).fromNow();
-		final Configuration flinkConfiguration = new Configuration();
-		UUID leaderSessionID = UUID.randomUUID();
-		ActorRef jobManager = null;
-		JobID jobId1 = new JobID();
-		JobID jobId2 = new JobID();
-
-		// set HA mode to zookeeper so that we try to recover jobs
-		flinkConfiguration.setString(HighAvailabilityOptions.HA_MODE, "zookeeper");
-
-		try {
-			final SubmittedJobGraphStore submittedJobGraphStore = mock(SubmittedJobGraphStore.class);
-
-			SubmittedJobGraph submittedJobGraph = mock(SubmittedJobGraph.class);
-			when(submittedJobGraph.getJobId()).thenReturn(jobId2);
-
-			when(submittedJobGraphStore.getJobIds()).thenReturn(Arrays.asList(jobId1, jobId2));
-
-			// fail the first job recovery
-			when(submittedJobGraphStore.recoverJobGraph(eq(jobId1))).thenThrow(new Exception("Test exception"));
-			// succeed the second job recovery
-			when(submittedJobGraphStore.recoverJobGraph(eq(jobId2))).thenReturn(submittedJobGraph);
-
-			final TestingLeaderElectionService myLeaderElectionService = new TestingLeaderElectionService();
-
-			final Collection<JobID> recoveredJobs = new ArrayList<>(2);
-
-			BlobServer blobServer = mock(BlobServer.class);
-			Props jobManagerProps = Props.create(
-				TestingFailingHAJobManager.class,
-				flinkConfiguration,
-				TestingUtils.defaultExecutor(),
-				TestingUtils.defaultExecutor(),
-				mock(InstanceManager.class),
-				mock(Scheduler.class),
-				blobServer,
-				new BlobLibraryCacheManager(blobServer, FlinkUserCodeClassLoaders.ResolveOrder.CHILD_FIRST, new String[0]),
-				ActorRef.noSender(),
-				new FixedDelayRestartStrategy.FixedDelayRestartStrategyFactory(Int.MaxValue(), 100),
-				timeout,
-				myLeaderElectionService,
-				submittedJobGraphStore,
-				mock(CheckpointRecoveryFactory.class),
-				jobRecoveryTimeout,
-				UnregisteredMetricGroups.createUnregisteredJobManagerMetricGroup(),
-				recoveredJobs).withDispatcher(CallingThreadDispatcher.Id());
-
-			jobManager = system.actorOf(jobManagerProps);
-
-			final TestProbe testProbe = new TestProbe(system);
-
-			testProbe.watch(jobManager);
-
-			Future<Object> started = Patterns.ask(jobManager, new Identify(42), deadline.timeLeft().toMillis());
-
-			Await.ready(started, deadline.timeLeft());
-
-			// make the job manager the leader --> this triggers the recovery of all jobs
-			myLeaderElectionService.isLeader(leaderSessionID);
-
-			// check that we did not recover any jobs
-			assertThat(recoveredJobs, is(empty()));
-
-			// verify that the JobManager terminated
-			testProbe.expectTerminated(jobManager, timeout);
-		} finally {
-			TestingUtils.stopActor(jobManager);
-		}
-	}
-
-	static class TestingFailingHAJobManager extends JobManager {
-
-		private final Collection<JobID> recoveredJobs;
-
-		public TestingFailingHAJobManager(
-			Configuration flinkConfiguration,
-			ScheduledExecutorService futureExecutor,
-			Executor ioExecutor,
-			InstanceManager instanceManager,
-			Scheduler scheduler,
-			BlobServer blobServer,
-			BlobLibraryCacheManager libraryCacheManager,
-			ActorRef archive,
-			RestartStrategyFactory restartStrategyFactory,
-			FiniteDuration timeout,
-			LeaderElectionService leaderElectionService,
-			SubmittedJobGraphStore submittedJobGraphs,
-			CheckpointRecoveryFactory checkpointRecoveryFactory,
-			FiniteDuration jobRecoveryTimeout,
-			JobManagerMetricGroup jobManagerMetricGroup,
-			Collection<JobID> recoveredJobs) {
-			super(
-				flinkConfiguration,
-				futureExecutor,
-				ioExecutor,
-				instanceManager,
-				scheduler,
-				blobServer,
-				libraryCacheManager,
-				archive,
-				restartStrategyFactory,
-				timeout,
-				leaderElectionService,
-				submittedJobGraphs,
-				checkpointRecoveryFactory,
-				jobRecoveryTimeout,
-				jobManagerMetricGroup,
-				Option.<String>empty());
-
-			this.recoveredJobs = recoveredJobs;
-		}
-
-		@Override
-		public PartialFunction<Object, BoxedUnit> handleMessage() {
-			return ReceiveBuilder.match(
-				JobManagerMessages.RecoverSubmittedJob.class,
-				new FI.UnitApply<JobManagerMessages.RecoverSubmittedJob>() {
-					@Override
-					public void apply(JobManagerMessages.RecoverSubmittedJob submitJob) throws Exception {
-						recoveredJobs.add(submitJob.submittedJobGraph().getJobId());
-					}
-				}).matchAny(new FI.UnitApply<Object>() {
-				@Override
-				public void apply(Object o) throws Exception {
-					TestingFailingHAJobManager.super.handleMessage().apply(o);
-				}
-			}).build();
-		}
-	}
-
-	public static class BlockingInvokable extends AbstractInvokable {
-
-		private static final OneShotLatch LATCH = new OneShotLatch();
-
-		public BlockingInvokable(Environment environment) {
-			super(environment);
-		}
-
-		@Override
-		public void invoke() throws Exception {
-
-			OperatorID operatorID = OperatorID.fromJobVertexID(getEnvironment().getJobVertexId());
-			TaskStateManager taskStateManager = getEnvironment().getTaskStateManager();
-			PrioritizedOperatorSubtaskState subtaskState = taskStateManager.prioritizedOperatorState(operatorID);
-
-			int subtaskIndex = getIndexInSubtaskGroup();
-			if (subtaskIndex < BlockingStatefulInvokable.recoveredStates.length) {
-				Iterator<OperatorStateHandle> iterator =
-					subtaskState.getJobManagerManagedOperatorState().iterator();
-
-				if (iterator.hasNext()) {
-					OperatorStateHandle operatorStateHandle = iterator.next();
-					try (FSDataInputStream in = operatorStateHandle.openInputStream()) {
-						BlockingStatefulInvokable.recoveredStates[subtaskIndex] =
-							InstantiationUtil.deserializeObject(in, getUserCodeClassLoader());
-					}
-				}
-				Assert.assertFalse(iterator.hasNext());
-			}
-
-			LATCH.await();
-		}
-
-		public static void unblock() {
-			LATCH.trigger();
-		}
-	}
-
-	public static class BlockingStatefulInvokable extends BlockingInvokable {
-
-		private static final int NUM_CHECKPOINTS_TO_COMPLETE = 5;
-
-		private static volatile CountDownLatch completedCheckpointsLatch = new CountDownLatch(1);
-
-		static volatile long[] recoveredStates = new long[0];
-
-		private int completedCheckpoints = 0;
-
-		public BlockingStatefulInvokable(Environment environment) {
-			super(environment);
-		}
-
-		@Override
-		public boolean triggerCheckpoint(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions) throws Exception {
-			ByteStreamStateHandle byteStreamStateHandle = new ByteStreamStateHandle(
-					String.valueOf(UUID.randomUUID()),
-					InstantiationUtil.serializeObject(checkpointMetaData.getCheckpointId()));
-
-			Map<String, OperatorStateHandle.StateMetaInfo> stateNameToPartitionOffsets = new HashMap<>(1);
-			stateNameToPartitionOffsets.put(
-				"test-state",
-				new OperatorStateHandle.StateMetaInfo(new long[]{0L}, OperatorStateHandle.Mode.SPLIT_DISTRIBUTE));
-
-			OperatorStateHandle operatorStateHandle = new OperatorStreamStateHandle(stateNameToPartitionOffsets, byteStreamStateHandle);
-
-			TaskStateSnapshot checkpointStateHandles = new TaskStateSnapshot();
-			checkpointStateHandles.putSubtaskStateByOperatorID(
-				OperatorID.fromJobVertexID(getEnvironment().getJobVertexId()),
-				new OperatorSubtaskState(
-					StateObjectCollection.singleton(operatorStateHandle),
-					StateObjectCollection.empty(),
-					StateObjectCollection.empty(),
-					StateObjectCollection.empty()));
-
-			getEnvironment().acknowledgeCheckpoint(
-					checkpointMetaData.getCheckpointId(),
-					new CheckpointMetrics(0L, 0L, 0L, 0L),
-					checkpointStateHandles);
-			return true;
-		}
-
-		@Override
-		public void triggerCheckpointOnBarrier(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions, CheckpointMetrics checkpointMetrics) throws Exception {
-			throw new UnsupportedOperationException("should not be called!");
-		}
-
-		@Override
-		public void abortCheckpointOnBarrier(long checkpointId, Throwable cause) {
-			throw new UnsupportedOperationException("should not be called!");
-		}
-
-		@Override
-		public void notifyCheckpointComplete(long checkpointId) {
-			if (completedCheckpoints++ > NUM_CHECKPOINTS_TO_COMPLETE) {
-				completedCheckpointsLatch.countDown();
-			}
-		}
-
-		static void initializeStaticHelpers(int numSubtasks) {
-			completedCheckpointsLatch = new CountDownLatch(numSubtasks);
-			recoveredStates = new long[numSubtasks];
-		}
-
-		static void awaitCompletedCheckpoints() throws InterruptedException {
-			completedCheckpointsLatch.await();
-		}
-
-		static long[] getRecoveredStates() {
-			return recoveredStates;
-		}
-
-		private static OperatorStateHandle extractSingletonOperatorState(TaskStateSnapshot taskStateHandles) {
-			Set<Map.Entry<OperatorID, OperatorSubtaskState>> subtaskStateMappings = taskStateHandles.getSubtaskStateMappings();
-			Preconditions.checkNotNull(subtaskStateMappings);
-			Preconditions.checkState(subtaskStateMappings.size()  == 1);
-			OperatorSubtaskState subtaskState = subtaskStateMappings.iterator().next().getValue();
-			Collection<OperatorStateHandle> managedOperatorState =
-				Preconditions.checkNotNull(subtaskState).getManagedOperatorState();
-			Preconditions.checkNotNull(managedOperatorState);
-			Preconditions.checkState(managedOperatorState.size()  == 1);
-			return managedOperatorState.iterator().next();
-		}
-	}
-}