You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/09/14 13:02:06 UTC

[flink] 03/09: [FLINK-10255] Only react to onAddedJobGraph signal when being leader

This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 3e5d07ca349a7b010bc47d1cce9b9ad3208f55a6
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Sat Sep 8 15:19:37 2018 +0200

    [FLINK-10255] Only react to onAddedJobGraph signal when being leader
    
    The Dispatcher should only react to the onAddedJobGraph signal if it is the leader.
    In all other cases the signal should be ignored since the jobs will be recovered once
    the Dispatcher becomes the leader.
    
    In order to still support non-blocking job recoveries, this commit serializes all
    recovery operations by introducing a recoveryOperation future which first needs to
    complete before a subsequent operation is started. That way we can avoid race conditions
    between granting and revoking leadership as well as the onAddedJobGraph signals. This is
    important since we can only lock each JobGraph once and, thus, need to make sure that
    we don't release a lock of a properly recovered job in a concurrent operation.
    
    This closes #6678.
---
 .../flink/runtime/dispatcher/Dispatcher.java       | 172 +++++++++++++++------
 .../zookeeper/ZooKeeperStateHandleStore.java       |  11 +-
 .../flink/runtime/dispatcher/DispatcherHATest.java |   4 +-
 .../flink/runtime/dispatcher/DispatcherTest.java   |  10 +-
 .../runtime/dispatcher/TestingDispatcher.java      |  11 +-
 .../dispatcher/ZooKeeperHADispatcherTest.java      | 160 +++++++++++++++++--
 6 files changed, 290 insertions(+), 78 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
index c31e64c..40857aa 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
@@ -60,11 +60,12 @@ import org.apache.flink.runtime.rest.handler.legacy.backpressure.OperatorBackPre
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.FencedRpcEndpoint;
 import org.apache.flink.runtime.rpc.RpcService;
-import org.apache.flink.runtime.rpc.RpcUtils;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.function.BiFunctionWithException;
 import org.apache.flink.util.function.ConsumerWithException;
+import org.apache.flink.util.function.FunctionUtils;
 
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
@@ -128,6 +129,8 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
 
 	private final Map<JobID, CompletableFuture<Void>> jobManagerTerminationFutures;
 
+	private CompletableFuture<Void> recoveryOperation = CompletableFuture.completedFuture(null);
+
 	public Dispatcher(
 			RpcService rpcService,
 			String endpointId,
@@ -629,31 +632,51 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
 	 * Recovers all jobs persisted via the submitted job graph store.
 	 */
 	@VisibleForTesting
-	CompletableFuture<Collection<JobGraph>> recoverJobs() {
+	Collection<JobGraph> recoverJobs() throws Exception {
 		log.info("Recovering all persisted jobs.");
-		return FutureUtils.supplyAsync(
-			() -> {
-				final Collection<JobID> jobIds = submittedJobGraphStore.getJobIds();
-
-				final List<JobGraph> jobGraphs = new ArrayList<>(jobIds.size());
+		final Collection<JobID> jobIds = submittedJobGraphStore.getJobIds();
 
-				for (JobID jobId : jobIds) {
-					jobGraphs.add(recoverJob(jobId));
+		try {
+			return recoverJobGraphs(jobIds);
+		} catch (Exception e) {
+			// release all recovered job graphs
+			for (JobID jobId : jobIds) {
+				try {
+					submittedJobGraphStore.releaseJobGraph(jobId);
+				} catch (Exception ie) {
+					e.addSuppressed(ie);
 				}
+			}
+			throw e;
+		}
+	}
 
-				return jobGraphs;
-			},
-			getRpcService().getExecutor());
+	@Nonnull
+	private Collection<JobGraph> recoverJobGraphs(Collection<JobID> jobIds) throws Exception {
+		final List<JobGraph> jobGraphs = new ArrayList<>(jobIds.size());
+
+		for (JobID jobId : jobIds) {
+			final JobGraph jobGraph = recoverJob(jobId);
+
+			if (jobGraph == null) {
+				throw new FlinkJobNotFoundException(jobId);
+			}
+
+			jobGraphs.add(jobGraph);
+		}
+
+		return jobGraphs;
 	}
 
+	@Nullable
 	private JobGraph recoverJob(JobID jobId) throws Exception {
 		log.debug("Recover job {}.", jobId);
-		SubmittedJobGraph submittedJobGraph = submittedJobGraphStore.recoverJobGraph(jobId);
+		final SubmittedJobGraph submittedJobGraph = submittedJobGraphStore.recoverJobGraph(jobId);
 
 		if (submittedJobGraph != null) {
 			return submittedJobGraph.getJobGraph();
 		} else {
-			throw new FlinkJobNotFoundException(jobId);
+			return null;
 		}
 	}
 
@@ -768,27 +791,40 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
 	 */
 	@Override
 	public void grantLeadership(final UUID newLeaderSessionID) {
-		log.info("Dispatcher {} was granted leadership with fencing token {}", getAddress(), newLeaderSessionID);
+		runAsyncWithoutFencing(
+			() -> {
+				log.info("Dispatcher {} was granted leadership with fencing token {}", getAddress(), newLeaderSessionID);
 
-		final CompletableFuture<Collection<JobGraph>> recoveredJobsFuture = recoverJobs();
+				final CompletableFuture<Collection<JobGraph>> recoveredJobsFuture = recoveryOperation.thenApplyAsync(
+					FunctionUtils.uncheckedFunction(ignored -> recoverJobs()),
+					getRpcService().getExecutor());
 
-		final CompletableFuture<Boolean> fencingTokenFuture = recoveredJobsFuture.thenComposeAsync(
-			(Collection<JobGraph> recoveredJobs) -> tryAcceptLeadershipAndRunJobs(newLeaderSessionID, recoveredJobs),
-			getUnfencedMainThreadExecutor());
+				final CompletableFuture<Boolean> fencingTokenFuture = recoveredJobsFuture.thenComposeAsync(
+					(Collection<JobGraph> recoveredJobs) -> tryAcceptLeadershipAndRunJobs(newLeaderSessionID, recoveredJobs),
+					getUnfencedMainThreadExecutor());
 
-		final CompletableFuture<Void> confirmationFuture = fencingTokenFuture.thenAcceptAsync(
-			(Boolean confirmLeadership) -> {
-				if (confirmLeadership) {
-					leaderElectionService.confirmLeaderSessionID(newLeaderSessionID);
-				}
-			},
-			getRpcService().getExecutor());
+				final CompletableFuture<Void> confirmationFuture = fencingTokenFuture.thenCombineAsync(
+					recoveredJobsFuture,
+					(BiFunctionWithException<Boolean, Collection<JobGraph>, Void, Exception>) (Boolean confirmLeadership, Collection<JobGraph> recoveredJobs) -> {
+						if (confirmLeadership) {
+							leaderElectionService.confirmLeaderSessionID(newLeaderSessionID);
+						} else {
+							for (JobGraph recoveredJob : recoveredJobs) {
+								submittedJobGraphStore.releaseJobGraph(recoveredJob.getJobID());
+							}
+						}
+						return null;
+					},
+					getRpcService().getExecutor());
+
+				confirmationFuture.whenComplete(
+					(Void ignored, Throwable throwable) -> {
+						if (throwable != null) {
+							onFatalError(ExceptionUtils.stripCompletionException(throwable));
+						}
+					});
 
-		confirmationFuture.whenComplete(
-			(Void ignored, Throwable throwable) -> {
-				if (throwable != null) {
-					onFatalError(ExceptionUtils.stripCompletionException(throwable));
-				}
+				recoveryOperation = confirmationFuture;
 			});
 	}
 
@@ -829,7 +865,7 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
 			getMainThreadExecutor());
 	}
 
-	protected CompletableFuture<Void> getJobTerminationFuture(JobID jobId) {
+	CompletableFuture<Void> getJobTerminationFuture(JobID jobId) {
 		if (jobManagerRunners.containsKey(jobId)) {
 			return FutureUtils.completedExceptionally(new DispatcherException(String.format("Job with job id %s is still running.", jobId)));
 		} else {
@@ -837,6 +873,11 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
 		}
 	}
 
+	@VisibleForTesting
+	CompletableFuture<Void> getRecoveryOperation() {
+		return recoveryOperation;
+	}
+
 	private void setNewFencingToken(@Nullable DispatcherId dispatcherId) {
 		// clear the state if we've been the leader before
 		if (getFencingToken() != null) {
@@ -879,24 +920,63 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
 
 	@Override
 	public void onAddedJobGraph(final JobID jobId) {
-		final CompletableFuture<SubmittedJobGraph> recoveredJob = getRpcService().execute(
-			() -> submittedJobGraphStore.recoverJobGraph(jobId));
-
-		final CompletableFuture<Acknowledge> submissionFuture = recoveredJob.thenComposeAsync(
-			(SubmittedJobGraph submittedJobGraph) -> submitJob(submittedJobGraph.getJobGraph(), RpcUtils.INF_TIMEOUT),
-			getMainThreadExecutor());
-
-		submissionFuture.whenComplete(
-			(Acknowledge acknowledge, Throwable throwable) -> {
-				if (throwable != null) {
-					onFatalError(
-						new DispatcherException(
-							String.format("Could not start the added job %s", jobId),
-							ExceptionUtils.stripCompletionException(throwable)));
+		runAsync(
+			() -> {
+				if (!jobManagerRunners.containsKey(jobId)) {
+					// IMPORTANT: onAddedJobGraph can generate false positives and, thus, we must expect that
+					// the specified job is already removed from the SubmittedJobGraphStore. In this case,
+					// SubmittedJobGraphStore.recoverJob returns null.
+					final CompletableFuture<Optional<JobGraph>> recoveredJob = recoveryOperation.thenApplyAsync(
+						FunctionUtils.uncheckedFunction(ignored -> Optional.ofNullable(recoverJob(jobId))),
+						getRpcService().getExecutor());
+
+					final DispatcherId dispatcherId = getFencingToken();
+					final CompletableFuture<Void> submissionFuture = recoveredJob.thenComposeAsync(
+						(Optional<JobGraph> jobGraphOptional) -> jobGraphOptional.map(
+							FunctionUtils.uncheckedFunction(jobGraph -> tryRunRecoveredJobGraph(jobGraph, dispatcherId).thenAcceptAsync(
+								(ConsumerWithException<Boolean, Exception>) (Boolean isRecoveredJobRunning) -> {
+										if (!isRecoveredJobRunning) {
+											submittedJobGraphStore.releaseJobGraph(jobId);
+										}
+									},
+									getRpcService().getExecutor())))
+							.orElse(CompletableFuture.completedFuture(null)),
+						getUnfencedMainThreadExecutor());
+
+					submissionFuture.whenComplete(
+						(Void ignored, Throwable throwable) -> {
+							if (throwable != null) {
+								onFatalError(
+									new DispatcherException(
+										String.format("Could not start the added job %s", jobId),
+										ExceptionUtils.stripCompletionException(throwable)));
+							}
+						});
+
+					recoveryOperation = submissionFuture;
 				}
 			});
 	}
 
+	private CompletableFuture<Boolean> tryRunRecoveredJobGraph(JobGraph jobGraph, DispatcherId dispatcherId) throws Exception {
+		if (leaderElectionService.hasLeadership(dispatcherId.toUUID())) {
+			final JobID jobId = jobGraph.getJobID();
+			if (jobManagerRunners.containsKey(jobId)) {
+				// we must not release the job graph lock since it can only be locked once and
+				// is currently being executed. Once we support multiple locks, we must release
+				// the JobGraph here
+				log.debug("Ignore added JobGraph because the job {} is already running.", jobId);
+				return CompletableFuture.completedFuture(true);
+			} else if (runningJobsRegistry.getJobSchedulingStatus(jobId) != RunningJobsRegistry.JobSchedulingStatus.DONE) {
+				return waitForTerminatingJobManager(jobId, jobGraph, this::runJob).thenApply(ignored -> true);
+			} else {
+				log.debug("Ignore added JobGraph because the job {} has already been completed.", jobId);
+			}
+		}
+
+		return CompletableFuture.completedFuture(false);
+	}
+
 	@Override
 	public void onRemovedJobGraph(final JobID jobId) {
 		runAsync(() -> {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
index 8c3d31f..b9cd0c1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
@@ -526,22 +526,13 @@ public class ZooKeeperStateHandleStore<T extends Serializable> {
 				client.create().withMode(CreateMode.EPHEMERAL).forPath(getLockPath(path));
 			} catch (KeeperException.NodeExistsException ignored) {
 				// we have already created the lock
-			} catch (KeeperException.NoNodeException e) {
-				throw new Exception("Cannot lock the node " + path + " since it does not exist.", e);
 			}
 		}
 
 		boolean success = false;
 
 		try {
-			byte[] data;
-
-			try {
-				data = client.getData().forPath(path);
-			} catch (Exception e) {
-				throw new Exception("Failed to retrieve state handle data under " + path +
-					" from ZooKeeper.", e);
-			}
+			byte[] data = client.getData().forPath(path);
 
 			try {
 				RetrievableStateHandle<T> retrievableStateHandle = InstantiationUtil.deserializeObject(
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherHATest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherHATest.java
index cb26f48..335199a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherHATest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherHATest.java
@@ -302,9 +302,7 @@ public class DispatcherHATest extends TestLogger {
 		}
 
 		@Override
-		public void releaseJobGraph(JobID jobId) throws Exception {
-			throw new UnsupportedOperationException("Should not be called.");
-		}
+		public void releaseJobGraph(JobID jobId) throws Exception {}
 
 		@Override
 		public Collection<JobID> getJobIds() throws Exception {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
index d405fcd..1af10b8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
@@ -319,13 +319,13 @@ public class DispatcherTest extends TestLogger {
 		runningJobsRegistry.setJobFinished(TEST_JOB_ID);
 		dispatcher.onAddedJobGraph(TEST_JOB_ID);
 
-		final CompletableFuture<Throwable> errorFuture = fatalErrorHandler.getErrorFuture();
-
-		final Throwable throwable = errorFuture.get();
+		// wait until the recovery is over
+		dispatcher.getRecoverOperationFuture(TIMEOUT).get();
 
-		assertThat(throwable, instanceOf(DispatcherException.class));
+		final DispatcherGateway dispatcherGateway = dispatcher.getSelfGateway(DispatcherGateway.class);
 
-		fatalErrorHandler.clearError();
+		// check that we did not start executing the added JobGraph
+		assertThat(dispatcherGateway.listJobs(TIMEOUT).get(), is(empty()));
 	}
 
 	/**
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingDispatcher.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingDispatcher.java
index 5141be0..6a62376 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingDispatcher.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingDispatcher.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.runtime.dispatcher;
 
-import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
@@ -73,16 +72,20 @@ class TestingDispatcher extends Dispatcher {
 			VoidHistoryServerArchivist.INSTANCE);
 	}
 
-	@VisibleForTesting
 	void completeJobExecution(ArchivedExecutionGraph archivedExecutionGraph) {
 		runAsync(
 			() -> jobReachedGloballyTerminalState(archivedExecutionGraph));
 	}
 
-	@VisibleForTesting
-	public CompletableFuture<Void> getJobTerminationFuture(@Nonnull JobID jobId, @Nonnull Time timeout) {
+	CompletableFuture<Void> getJobTerminationFuture(@Nonnull JobID jobId, @Nonnull Time timeout) {
 		return callAsyncWithoutFencing(
 			() -> getJobTerminationFuture(jobId),
 			timeout).thenCompose(Function.identity());
 	}
+
+	CompletableFuture<Void> getRecoverOperationFuture(@Nonnull Time timeout) {
+		return callAsyncWithoutFencing(
+			this::getRecoveryOperation,
+			timeout).thenCompose(Function.identity());
+	}
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/ZooKeeperHADispatcherTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/ZooKeeperHADispatcherTest.java
index dd03758..b5662c0 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/ZooKeeperHADispatcherTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/ZooKeeperHADispatcherTest.java
@@ -24,17 +24,25 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.runtime.blob.BlobServer;
 import org.apache.flink.runtime.blob.VoidBlobStore;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
+import org.apache.flink.runtime.highavailability.zookeeper.ZooKeeperHaServices;
 import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobmanager.SubmittedJobGraph;
 import org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore;
+import org.apache.flink.runtime.jobmaster.JobResult;
 import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway;
+import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder;
 import org.apache.flink.runtime.rpc.RpcUtils;
 import org.apache.flink.runtime.rpc.TestingRpcService;
+import org.apache.flink.runtime.util.LeaderConnectionInfo;
+import org.apache.flink.runtime.util.LeaderRetrievalUtils;
 import org.apache.flink.runtime.util.TestingFatalErrorHandler;
 import org.apache.flink.runtime.util.ZooKeeperUtils;
 import org.apache.flink.runtime.zookeeper.ZooKeeperResource;
@@ -56,8 +64,13 @@ import javax.annotation.Nonnull;
 
 import java.io.IOException;
 import java.util.Collection;
+import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 
+import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.notNullValue;
 import static org.junit.Assert.assertThat;
 
 /**
@@ -67,8 +80,8 @@ public class ZooKeeperHADispatcherTest extends TestLogger {
 
 	private static final Time TIMEOUT = Time.seconds(10L);
 
-	@ClassRule
-	public static final ZooKeeperResource ZOO_KEEPER_RESOURCE = new ZooKeeperResource();
+	@Rule
+	public final ZooKeeperResource zooKeeperResource = new ZooKeeperResource();
 
 	@ClassRule
 	public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();
@@ -87,14 +100,14 @@ public class ZooKeeperHADispatcherTest extends TestLogger {
 	@BeforeClass
 	public static void setupClass() throws IOException {
 		configuration = new Configuration();
-		configuration.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, ZOO_KEEPER_RESOURCE.getConnectString());
 		configuration.setString(HighAvailabilityOptions.HA_STORAGE_PATH, TEMPORARY_FOLDER.newFolder().getAbsolutePath());
 		rpcService = new TestingRpcService();
 		blobServer = new BlobServer(configuration, new VoidBlobStore());
 	}
 
 	@Before
-	public void setup() {
+	public void setup() throws Exception {
+		configuration.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, zooKeeperResource.getConnectString());
 		testingFatalErrorHandler = new TestingFatalErrorHandler();
 	}
 
@@ -139,7 +152,9 @@ public class ZooKeeperHADispatcherTest extends TestLogger {
 			final TestingLeaderElectionService leaderElectionService = new TestingLeaderElectionService();
 			testingHighAvailabilityServices.setDispatcherLeaderElectionService(leaderElectionService);
 
-			final TestingDispatcher dispatcher = createDispatcher(testingHighAvailabilityServices);
+			final TestingDispatcher dispatcher = createDispatcher(
+				testingHighAvailabilityServices,
+				new TestingJobManagerRunnerFactory(new CompletableFuture<>(), new CompletableFuture<>()));
 
 			dispatcher.start();
 
@@ -167,7 +182,7 @@ public class ZooKeeperHADispatcherTest extends TestLogger {
 				// recover the job
 				final SubmittedJobGraph submittedJobGraph = otherSubmittedJobGraphStore.recoverJobGraph(jobId);
 
-				assertThat(submittedJobGraph, Matchers.is(Matchers.notNullValue()));
+				assertThat(submittedJobGraph, is(notNullValue()));
 
 				// check that the other submitted job graph store can remove the job graph after the original leader
 				// has lost its leadership
@@ -184,20 +199,145 @@ public class ZooKeeperHADispatcherTest extends TestLogger {
 		}
 	}
 
+	/**
+	 * Tests that a standby Dispatcher does not interfere with the clean up of a completed
+	 * job.
+	 */
+	@Test
+	public void testStandbyDispatcherJobExecution() throws Exception {
+		try (final TestingHighAvailabilityServices haServices1 = new TestingHighAvailabilityServices();
+			final TestingHighAvailabilityServices haServices2 = new TestingHighAvailabilityServices();
+			final CuratorFramework curatorFramework = ZooKeeperUtils.startCuratorFramework(configuration)) {
+
+			final ZooKeeperSubmittedJobGraphStore submittedJobGraphStore1 = ZooKeeperUtils.createSubmittedJobGraphs(curatorFramework, configuration);
+			haServices1.setSubmittedJobGraphStore(submittedJobGraphStore1);
+			final TestingLeaderElectionService leaderElectionService1 = new TestingLeaderElectionService();
+			haServices1.setDispatcherLeaderElectionService(leaderElectionService1);
+
+			final ZooKeeperSubmittedJobGraphStore submittedJobGraphStore2 = ZooKeeperUtils.createSubmittedJobGraphs(curatorFramework, configuration);
+			haServices2.setSubmittedJobGraphStore(submittedJobGraphStore2);
+			final TestingLeaderElectionService leaderElectionService2 = new TestingLeaderElectionService();
+			haServices2.setDispatcherLeaderElectionService(leaderElectionService2);
+
+			final CompletableFuture<JobGraph> jobGraphFuture = new CompletableFuture<>();
+			final CompletableFuture<ArchivedExecutionGraph> resultFuture = new CompletableFuture<>();
+			final TestingDispatcher dispatcher1 = createDispatcher(
+				haServices1,
+				new TestingJobManagerRunnerFactory(jobGraphFuture, resultFuture));
+
+			final TestingDispatcher dispatcher2 = createDispatcher(
+				haServices2,
+				new TestingJobManagerRunnerFactory(new CompletableFuture<>(), new CompletableFuture<>()));
+
+			try {
+				dispatcher1.start();
+				dispatcher2.start();
+
+				leaderElectionService1.isLeader(UUID.randomUUID()).get();
+				final DispatcherGateway dispatcherGateway1 = dispatcher1.getSelfGateway(DispatcherGateway.class);
+
+				final JobGraph jobGraph = DispatcherHATest.createNonEmptyJobGraph();
+
+				dispatcherGateway1.submitJob(jobGraph, TIMEOUT).get();
+
+				final CompletableFuture<JobResult> jobResultFuture = dispatcherGateway1.requestJobResult(jobGraph.getJobID(), TIMEOUT);
+
+				jobGraphFuture.get();
+
+				// complete the job
+				resultFuture.complete(new ArchivedExecutionGraphBuilder().setJobID(jobGraph.getJobID()).setState(JobStatus.FINISHED).build());
+
+				final JobResult jobResult = jobResultFuture.get();
+
+				assertThat(jobResult.isSuccess(), is(true));
+
+				// wait for the completion of the job
+				dispatcher1.getJobTerminationFuture(jobGraph.getJobID(), TIMEOUT).get();
+
+				// change leadership
+				leaderElectionService1.notLeader();
+				leaderElectionService2.isLeader(UUID.randomUUID()).get();
+
+				// Dispatcher 2 should not recover any jobs
+				final DispatcherGateway dispatcherGateway2 = dispatcher2.getSelfGateway(DispatcherGateway.class);
+				assertThat(dispatcherGateway2.listJobs(TIMEOUT).get(), is(empty()));
+			} finally {
+				RpcUtils.terminateRpcEndpoint(dispatcher1, TIMEOUT);
+				RpcUtils.terminateRpcEndpoint(dispatcher2, TIMEOUT);
+			}
+		}
+	}
+
+	/**
+	 * Tests that a standby {@link Dispatcher} can recover all submitted jobs.
+	 */
+	@Test
+	public void testStandbyDispatcherJobRecovery() throws Exception {
+		try (CuratorFramework curatorFramework = ZooKeeperUtils.startCuratorFramework(configuration)) {
+
+			HighAvailabilityServices haServices = null;
+			Dispatcher dispatcher1 = null;
+			Dispatcher dispatcher2 = null;
+
+			try {
+				haServices = new ZooKeeperHaServices(curatorFramework, rpcService.getExecutor(), configuration, new VoidBlobStore());
+
+				final CompletableFuture<JobGraph> jobGraphFuture1 = new CompletableFuture<>();
+				dispatcher1 = createDispatcher(
+					haServices,
+					new TestingJobManagerRunnerFactory(jobGraphFuture1, new CompletableFuture<>()));
+				final CompletableFuture<JobGraph> jobGraphFuture2 = new CompletableFuture<>();
+				dispatcher2 = createDispatcher(
+					haServices,
+					new TestingJobManagerRunnerFactory(jobGraphFuture2, new CompletableFuture<>()));
+
+				dispatcher1.start();
+				dispatcher2.start();
+
+				final LeaderConnectionInfo leaderConnectionInfo = LeaderRetrievalUtils.retrieveLeaderConnectionInfo(haServices.getDispatcherLeaderRetriever(), TIMEOUT);
+
+				final DispatcherGateway dispatcherGateway = rpcService.connect(leaderConnectionInfo.getAddress(), DispatcherId.fromUuid(leaderConnectionInfo.getLeaderSessionID()), DispatcherGateway.class).get();
+
+				final JobGraph nonEmptyJobGraph = DispatcherHATest.createNonEmptyJobGraph();
+				dispatcherGateway.submitJob(nonEmptyJobGraph, TIMEOUT).get();
+
+				if (dispatcher1.getAddress().equals(leaderConnectionInfo.getAddress())) {
+					dispatcher1.shutDown();
+					assertThat(jobGraphFuture2.get().getJobID(), is(equalTo(nonEmptyJobGraph.getJobID())));
+				} else {
+					dispatcher2.shutDown();
+					assertThat(jobGraphFuture1.get().getJobID(), is(equalTo(nonEmptyJobGraph.getJobID())));
+				}
+			} finally {
+				if (dispatcher1 != null) {
+					RpcUtils.terminateRpcEndpoint(dispatcher1, TIMEOUT);
+				}
+
+				if (dispatcher2 != null) {
+					RpcUtils.terminateRpcEndpoint(dispatcher2, TIMEOUT);
+				}
+
+				if (haServices != null) {
+					haServices.close();
+				}
+			}
+		}
+	}
+
 	@Nonnull
-	private TestingDispatcher createDispatcher(TestingHighAvailabilityServices testingHighAvailabilityServices) throws Exception {
+	private TestingDispatcher createDispatcher(HighAvailabilityServices highAvailabilityServices, Dispatcher.JobManagerRunnerFactory jobManagerRunnerFactory) throws Exception {
 		return new TestingDispatcher(
 			rpcService,
-			Dispatcher.DISPATCHER_NAME + '_' + name.getMethodName(),
+			Dispatcher.DISPATCHER_NAME + '_' + name.getMethodName() + UUID.randomUUID(),
 			configuration,
-			testingHighAvailabilityServices,
+			highAvailabilityServices,
 			new TestingResourceManagerGateway(),
 			blobServer,
 			new HeartbeatServices(1000L, 1000L),
 			UnregisteredMetricGroups.createUnregisteredJobManagerMetricGroup(),
 			null,
 			new MemoryArchivedExecutionGraphStore(),
-			new TestingJobManagerRunnerFactory(new CompletableFuture<>(), new CompletableFuture<>()),
+			jobManagerRunnerFactory,
 			testingFatalErrorHandler);
 	}
 }