You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2018/09/12 14:50:19 UTC

[GitHub] tillrohrmann closed pull request #6589: [Backport 1.5][FLINK-10011] Release JobGraph from SubmittedJobGraphStore

tillrohrmann closed pull request #6589: [Backport 1.5][FLINK-10011] Release JobGraph from SubmittedJobGraphStore
URL: https://github.com/apache/flink/pull/6589
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/ZooKeeperMesosServices.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/ZooKeeperMesosServices.java
index 069cb833a3a..45d11412c50 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/ZooKeeperMesosServices.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/ZooKeeperMesosServices.java
@@ -56,8 +56,7 @@ public MesosWorkerStore createMesosWorkerStore(Configuration configuration, Exec
 
 		ZooKeeperStateHandleStore<MesosWorkerStore.Worker> zooKeeperStateHandleStore = zooKeeperUtilityFactory.createZooKeeperStateHandleStore(
 			"/workers",
-			stateStorageHelper,
-			executor);
+			stateStorageHelper);
 
 		ZooKeeperSharedValue frameworkId = zooKeeperUtilityFactory.createSharedValue("/frameworkId", new byte[0]);
 		ZooKeeperSharedCount totalTaskCount = zooKeeperUtilityFactory.createSharedCount("/taskCount", 0);
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/akka/ActorUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/akka/ActorUtils.java
index f2f905971c3..9a992814235 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/akka/ActorUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/akka/ActorUtils.java
@@ -19,9 +19,11 @@
 package org.apache.flink.runtime.akka;
 
 import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.instance.AkkaActorGateway;
 
 import akka.actor.ActorRef;
 import akka.actor.Kill;
+import akka.actor.PoisonPill;
 import akka.pattern.Patterns;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -85,5 +87,13 @@
 		return FutureUtils.completeAll(terminationFutures);
 	}
 
+	public static void stopActor(AkkaActorGateway akkaActorGateway) {
+		stopActor(akkaActorGateway.actor());
+	}
+
+	public static void stopActor(ActorRef actorRef) {
+		actorRef.tell(PoisonPill.getInstance(), ActorRef.noSender());
+	}
+
 	private ActorUtils() {}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
index f22127041d3..131733924ae 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
@@ -25,14 +25,13 @@
 import org.apache.flink.runtime.zookeeper.RetrievableStateStorageHelper;
 import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore;
 import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.function.ConsumerWithException;
 
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.utils.ZKPaths;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.annotation.Nullable;
-
 import java.io.IOException;
 import java.util.ArrayDeque;
 import java.util.ArrayList;
@@ -86,6 +85,8 @@
 	 */
 	private final ArrayDeque<CompletedCheckpoint> completedCheckpoints;
 
+	private final Executor executor;
+
 	/**
 	 * Creates a {@link ZooKeeperCompletedCheckpointStore} instance.
 	 *
@@ -98,7 +99,7 @@
 	 *                                       start with a '/')
 	 * @param stateStorage                   State storage to be used to persist the completed
 	 *                                       checkpoint
-	 * @param executor to give to the ZooKeeperStateHandleStore to run ZooKeeper callbacks
+	 * @param executor to execute blocking calls
 	 * @throws Exception
 	 */
 	public ZooKeeperCompletedCheckpointStore(
@@ -123,10 +124,12 @@ public ZooKeeperCompletedCheckpointStore(
 		// All operations will have the path as root
 		this.client = client.usingNamespace(client.getNamespace() + checkpointsPath);
 
-		this.checkpointsInZooKeeper = new ZooKeeperStateHandleStore<>(this.client, stateStorage, executor);
+		this.checkpointsInZooKeeper = new ZooKeeperStateHandleStore<>(this.client, stateStorage);
 
 		this.completedCheckpoints = new ArrayDeque<>(maxNumberOfCheckpointsToRetain + 1);
 
+		this.executor = checkNotNull(executor);
+
 		LOG.info("Initialized in '{}'.", checkpointsPath);
 	}
 
@@ -236,16 +239,30 @@ public void addCheckpoint(final CompletedCheckpoint checkpoint) throws Exception
 
 		// Everything worked, let's remove a previous checkpoint if necessary.
 		while (completedCheckpoints.size() > maxNumberOfCheckpointsToRetain) {
-			try {
-				removeSubsumed(completedCheckpoints.removeFirst());
-			} catch (Exception e) {
-				LOG.warn("Failed to subsume the old checkpoint", e);
-			}
+			final CompletedCheckpoint completedCheckpoint = completedCheckpoints.removeFirst();
+			tryRemoveCompletedCheckpoint(completedCheckpoint, CompletedCheckpoint::discardOnSubsume);
 		}
 
 		LOG.debug("Added {} to {}.", checkpoint, path);
 	}
 
+	private void tryRemoveCompletedCheckpoint(CompletedCheckpoint completedCheckpoint, ConsumerWithException<CompletedCheckpoint, Exception> discardCallback) {
+		try {
+			if (tryRemove(completedCheckpoint.getCheckpointID())) {
+				executor.execute(() -> {
+					try {
+						discardCallback.accept(completedCheckpoint);
+					} catch (Exception e) {
+						LOG.warn("Could not discard completed checkpoint {}.", completedCheckpoint.getCheckpointID(), e);
+					}
+				});
+
+			}
+		} catch (Exception e) {
+			LOG.warn("Failed to subsume the old checkpoint", e);
+		}
+	}
+
 	@Override
 	public CompletedCheckpoint getLatestCheckpoint() {
 		if (completedCheckpoints.isEmpty()) {
@@ -278,11 +295,9 @@ public void shutdown(JobStatus jobStatus) throws Exception {
 			LOG.info("Shutting down");
 
 			for (CompletedCheckpoint checkpoint : completedCheckpoints) {
-				try {
-					removeShutdown(checkpoint, jobStatus);
-				} catch (Exception e) {
-					LOG.error("Failed to discard checkpoint.", e);
-				}
+				tryRemoveCompletedCheckpoint(
+					checkpoint,
+					completedCheckpoint -> completedCheckpoint.discardOnShutdown(jobStatus));
 			}
 
 			completedCheckpoints.clear();
@@ -305,59 +320,13 @@ public void shutdown(JobStatus jobStatus) throws Exception {
 	// ------------------------------------------------------------------------
 
 	/**
-	 * Removes a subsumed checkpoint from ZooKeeper and drops the state.
-	 */
-	private void removeSubsumed(
-		final CompletedCheckpoint completedCheckpoint) throws Exception {
-
-		if (completedCheckpoint == null) {
-			return;
-		}
-
-		ZooKeeperStateHandleStore.RemoveCallback<CompletedCheckpoint> action =
-			new ZooKeeperStateHandleStore.RemoveCallback<CompletedCheckpoint>() {
-				@Override
-				public void apply(@Nullable RetrievableStateHandle<CompletedCheckpoint> value) throws FlinkException {
-					if (value != null) {
-						try {
-							completedCheckpoint.discardOnSubsume();
-						} catch (Exception e) {
-							throw new FlinkException("Could not discard the completed checkpoint on subsume.", e);
-						}
-					}
-				}
-			};
-
-		checkpointsInZooKeeper.releaseAndTryRemove(
-			checkpointIdToPath(completedCheckpoint.getCheckpointID()),
-			action);
-	}
-
-	/**
-	 * Removes a checkpoint from ZooKeeper because of Job shutdown and drops the state.
+	 * Tries to remove the checkpoint identified by the given checkpoint id.
+	 *
+	 * @param checkpointId identifying the checkpoint to remove
+	 * @return true if the checkpoint could be removed
 	 */
-	private void removeShutdown(
-			final CompletedCheckpoint completedCheckpoint,
-			final JobStatus jobStatus) throws Exception {
-
-		if (completedCheckpoint == null) {
-			return;
-		}
-
-		ZooKeeperStateHandleStore.RemoveCallback<CompletedCheckpoint> removeAction = new ZooKeeperStateHandleStore.RemoveCallback<CompletedCheckpoint>() {
-			@Override
-			public void apply(@Nullable RetrievableStateHandle<CompletedCheckpoint> value) throws FlinkException {
-				try {
-					completedCheckpoint.discardOnShutdown(jobStatus);
-				} catch (Exception e) {
-					throw new FlinkException("Could not discard the completed checkpoint on subsume.", e);
-				}
-			}
-		};
-
-		checkpointsInZooKeeper.releaseAndTryRemove(
-			checkpointIdToPath(completedCheckpoint.getCheckpointID()),
-			removeAction);
+	private boolean tryRemove(long checkpointId) throws Exception {
+		return checkpointsInZooKeeper.releaseAndTryRemove(checkpointIdToPath(checkpointId));
 	}
 
 	/**
@@ -381,7 +350,7 @@ public static long pathToCheckpointId(String path) {
 			String numberString;
 
 			// check if we have a leading slash
-			if ('/' == path.charAt(0) ) {
+			if ('/' == path.charAt(0)) {
 				numberString = path.substring(1);
 			} else {
 				numberString = path;
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
index c96acbd3192..c31e64c0adc 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
@@ -572,30 +572,38 @@ private void registerJobManagerRunnerTerminationFuture(JobID jobId, CompletableF
 		}
 
 		return jobManagerRunnerTerminationFuture.thenRunAsync(
-			() -> {
-				jobManagerMetricGroup.removeJob(jobId);
+			() -> cleanUpJobData(jobId, cleanupHA),
+			getRpcService().getExecutor());
+	}
 
-				boolean cleanupHABlobs = false;
-				if (cleanupHA) {
-					try {
-						submittedJobGraphStore.removeJobGraph(jobId);
+	private void cleanUpJobData(JobID jobId, boolean cleanupHA) {
+		jobManagerMetricGroup.removeJob(jobId);
 
-						// only clean up the HA blobs if we could remove the job from HA storage
-						cleanupHABlobs = true;
-					} catch (Exception e) {
-						log.warn("Could not properly remove job {} from submitted job graph store.", jobId, e);
-					}
+		boolean cleanupHABlobs = false;
+		if (cleanupHA) {
+			try {
+				submittedJobGraphStore.removeJobGraph(jobId);
 
-					try {
-						runningJobsRegistry.clearJob(jobId);
-					} catch (IOException e) {
-						log.warn("Could not properly remove job {} from the running jobs registry.", jobId, e);
-					}
-				}
+				// only clean up the HA blobs if we could remove the job from HA storage
+				cleanupHABlobs = true;
+			} catch (Exception e) {
+				log.warn("Could not properly remove job {} from submitted job graph store.", jobId, e);
+			}
 
-				blobServer.cleanupJob(jobId, cleanupHABlobs);
-			},
-			getRpcService().getExecutor());
+			try {
+				runningJobsRegistry.clearJob(jobId);
+			} catch (IOException e) {
+				log.warn("Could not properly remove job {} from the running jobs registry.", jobId, e);
+			}
+		} else {
+			try {
+				submittedJobGraphStore.releaseJobGraph(jobId);
+			} catch (Exception e) {
+				log.warn("Could not properly release job {} from submitted job graph store.", jobId, e);
+			}
+		}
+
+		blobServer.cleanupJob(jobId, cleanupHABlobs);
 	}
 
 	/**
@@ -806,8 +814,7 @@ public void grantLeadership(final UUID newLeaderSessionID) {
 	}
 
 	private CompletableFuture<Void> waitForTerminatingJobManager(JobID jobId, JobGraph jobGraph, ConsumerWithException<JobGraph, ?> action) {
-		final CompletableFuture<Void> jobManagerTerminationFuture = jobManagerTerminationFutures
-			.getOrDefault(jobId, CompletableFuture.completedFuture(null))
+		final CompletableFuture<Void> jobManagerTerminationFuture = getJobTerminationFuture(jobId)
 			.exceptionally((Throwable throwable) -> {
 				throw new CompletionException(
 					new DispatcherException(
@@ -822,6 +829,14 @@ public void grantLeadership(final UUID newLeaderSessionID) {
 			getMainThreadExecutor());
 	}
 
+	protected CompletableFuture<Void> getJobTerminationFuture(JobID jobId) {
+		if (jobManagerRunners.containsKey(jobId)) {
+			return FutureUtils.completedExceptionally(new DispatcherException(String.format("Job with job id %s is still running.", jobId)));
+		} else {
+			return jobManagerTerminationFutures.getOrDefault(jobId, CompletableFuture.completedFuture(null));
+		}
+	}
+
 	private void setNewFencingToken(@Nullable DispatcherId dispatcherId) {
 		// clear the state if we've been the leader before
 		if (getFencingToken() != null) {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/SingleJobSubmittedJobGraphStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/SingleJobSubmittedJobGraphStore.java
index 26d3abc2f1f..fe7f5f14cc8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/SingleJobSubmittedJobGraphStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/SingleJobSubmittedJobGraphStore.java
@@ -66,12 +66,17 @@ public void putJobGraph(SubmittedJobGraph jobGraph) throws Exception {
 	}
 
 	@Override
-	public void removeJobGraph(JobID jobId) throws Exception {
+	public void removeJobGraph(JobID jobId) {
 		// ignore
 	}
 
 	@Override
-	public Collection<JobID> getJobIds() throws Exception {
+	public void releaseJobGraph(JobID jobId) {
+		// ignore
+	}
+
+	@Override
+	public Collection<JobID> getJobIds() {
 		return Collections.singleton(jobGraph.getJobID());
 	}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServices.java
index 3882479ce95..ea96d7d43d2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServices.java
@@ -179,7 +179,7 @@ public CheckpointRecoveryFactory getCheckpointRecoveryFactory() {
 
 	@Override
 	public SubmittedJobGraphStore getSubmittedJobGraphStore() throws Exception {
-		return ZooKeeperUtils.createSubmittedJobGraphs(client, configuration, executor);
+		return ZooKeeperUtils.createSubmittedJobGraphs(client, configuration);
 	}
 
 	@Override
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/StandaloneSubmittedJobGraphStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/StandaloneSubmittedJobGraphStore.java
index d1ca1a38853..f28621f0d69 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/StandaloneSubmittedJobGraphStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/StandaloneSubmittedJobGraphStore.java
@@ -43,22 +43,27 @@ public void stop() {
 	}
 
 	@Override
-	public void putJobGraph(SubmittedJobGraph jobGraph) throws Exception {
+	public void putJobGraph(SubmittedJobGraph jobGraph) {
 		// Nothing to do
 	}
 
 	@Override
-	public void removeJobGraph(JobID jobId) throws Exception {
+	public void removeJobGraph(JobID jobId) {
 		// Nothing to do
 	}
 
 	@Override
-	public Collection<JobID> getJobIds() throws Exception {
+	public void releaseJobGraph(JobID jobId) {
+		// nothing to do
+	}
+
+	@Override
+	public Collection<JobID> getJobIds() {
 		return Collections.emptyList();
 	}
 
 	@Override
-	public SubmittedJobGraph recoverJobGraph(JobID jobId) throws Exception {
+	public SubmittedJobGraph recoverJobGraph(JobID jobId) {
 		return null;
 	}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/SubmittedJobGraphStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/SubmittedJobGraphStore.java
index 7e624ec6e1d..b40a4a2b95f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/SubmittedJobGraphStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/SubmittedJobGraphStore.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.jobmanager;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.jobgraph.JobGraph;
 
 import javax.annotation.Nullable;
 
@@ -58,6 +59,17 @@
 	 */
 	void removeJobGraph(JobID jobId) throws Exception;
 
+	/**
+	 * Releases the locks on the specified {@link JobGraph}.
+	 *
+	 * Releasing the locks allows that another instance can delete the job from
+	 * the {@link SubmittedJobGraphStore}.
+	 *
+	 * @param jobId specifying the job to release the locks for
+	 * @throws Exception if the locks cannot be released
+	 */
+	void releaseJobGraph(JobID jobId) throws Exception;
+
 	/**
 	 * Get all job ids of submitted job graphs to the submitted job graph store.
 	 *
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
index 7ba5d481177..2b935af229a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
@@ -41,7 +41,6 @@
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
-import java.util.concurrent.Executor;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.apache.flink.util.Preconditions.checkState;
@@ -69,13 +68,13 @@
 	/** Lock to synchronize with the {@link SubmittedJobGraphListener}. */
 	private final Object cacheLock = new Object();
 
-	/** Client (not a namespace facade) */
+	/** Client (not a namespace facade). */
 	private final CuratorFramework client;
 
 	/** The set of IDs of all added job graphs. */
 	private final Set<JobID> addedJobGraphs = new HashSet<>();
 
-	/** Completed checkpoints in ZooKeeper */
+	/** Completed checkpoints in ZooKeeper. */
 	private final ZooKeeperStateHandleStore<SubmittedJobGraph> jobGraphsInZooKeeper;
 
 	/**
@@ -94,19 +93,17 @@
 	private boolean isRunning;
 
 	/**
-	 * Submitted job graph store backed by ZooKeeper
+	 * Submitted job graph store backed by ZooKeeper.
 	 *
 	 * @param client ZooKeeper client
 	 * @param currentJobsPath ZooKeeper path for current job graphs
 	 * @param stateStorage State storage used to persist the submitted jobs
-	 * @param executor to give to the ZooKeeperStateHandleStore to run ZooKeeper callbacks
 	 * @throws Exception
 	 */
 	public ZooKeeperSubmittedJobGraphStore(
 			CuratorFramework client,
 			String currentJobsPath,
-			RetrievableStateStorageHelper<SubmittedJobGraph> stateStorage,
-			Executor executor) throws Exception {
+			RetrievableStateStorageHelper<SubmittedJobGraph> stateStorage) throws Exception {
 
 		checkNotNull(currentJobsPath, "Current jobs path");
 		checkNotNull(stateStorage, "State storage");
@@ -123,7 +120,7 @@ public ZooKeeperSubmittedJobGraphStore(
 		CuratorFramework facade = client.usingNamespace(client.getNamespace() + currentJobsPath);
 
 		this.zooKeeperFullBasePath = client.getNamespace() + currentJobsPath;
-		this.jobGraphsInZooKeeper = new ZooKeeperStateHandleStore<>(facade, stateStorage, executor);
+		this.jobGraphsInZooKeeper = new ZooKeeperStateHandleStore<>(facade, stateStorage);
 
 		this.pathCache = new PathChildrenCache(facade, "/", false);
 		pathCache.getListenable().addListener(new SubmittedJobGraphsPathCacheListener());
@@ -276,6 +273,24 @@ public void removeJobGraph(JobID jobId) throws Exception {
 		LOG.info("Removed job graph {} from ZooKeeper.", jobId);
 	}
 
+	@Override
+	public void releaseJobGraph(JobID jobId) throws Exception {
+		checkNotNull(jobId, "Job ID");
+		final String path = getPathForJob(jobId);
+
+		LOG.debug("Releasing locks of job graph {} from {}{}.", jobId, zooKeeperFullBasePath, path);
+
+		synchronized (cacheLock) {
+			if (addedJobGraphs.contains(jobId)) {
+				jobGraphsInZooKeeper.release(path);
+
+				addedJobGraphs.remove(jobId);
+			}
+		}
+
+		LOG.info("Released locks of job graph {} from ZooKeeper.", jobId);
+	}
+
 	@Override
 	public Collection<JobID> getJobIds() throws Exception {
 		Collection<String> paths;
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
index 43c930e6fea..cc1ec7044c4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
@@ -54,6 +54,9 @@
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
+/**
+ * Class containing helper functions to interact with ZooKeeper.
+ */
 public class ZooKeeperUtils {
 
 	private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperUtils.class);
@@ -227,14 +230,12 @@ public static ZooKeeperLeaderElectionService createLeaderElectionService(
 	 *
 	 * @param client        The {@link CuratorFramework} ZooKeeper client to use
 	 * @param configuration {@link Configuration} object
-	 * @param executor to run ZooKeeper callbacks
 	 * @return {@link ZooKeeperSubmittedJobGraphStore} instance
 	 * @throws Exception if the submitted job graph store cannot be created
 	 */
 	public static ZooKeeperSubmittedJobGraphStore createSubmittedJobGraphs(
 			CuratorFramework client,
-			Configuration configuration,
-			Executor executor) throws Exception {
+			Configuration configuration) throws Exception {
 
 		checkNotNull(configuration, "Configuration");
 
@@ -244,7 +245,9 @@ public static ZooKeeperSubmittedJobGraphStore createSubmittedJobGraphs(
 		String zooKeeperSubmittedJobsPath = configuration.getString(HighAvailabilityOptions.HA_ZOOKEEPER_JOBGRAPHS_PATH);
 
 		return new ZooKeeperSubmittedJobGraphStore(
-				client, zooKeeperSubmittedJobsPath, stateStorage, executor);
+			client,
+			zooKeeperSubmittedJobsPath,
+			stateStorage);
 	}
 
 	/**
@@ -344,6 +347,9 @@ public static String generateZookeeperPath(String root, String namespace) {
 		return root + namespace;
 	}
 
+	/**
+	 * Secure {@link ACLProvider} implementation.
+	 */
 	public static class SecureAclProvider implements ACLProvider {
 		@Override
 		public List<ACL> getDefaultAcl() {
@@ -356,6 +362,9 @@ public static String generateZookeeperPath(String root, String namespace) {
 		}
 	}
 
+	/**
+	 * ZooKeeper client ACL mode enum.
+	 */
 	public enum ZkClientACLMode {
 		CREATOR,
 		OPEN;
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
index f0d67fd2a87..f266e36471c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
@@ -18,17 +18,13 @@
 
 package org.apache.flink.runtime.zookeeper;
 
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.api.BackgroundCallback;
-import org.apache.curator.framework.api.CuratorEvent;
-import org.apache.curator.framework.api.CuratorEventType;
-import org.apache.curator.utils.ZKPaths;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.state.RetrievableStateHandle;
 import org.apache.flink.util.ExceptionUtils;
-import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.InstantiationUtil;
-import org.apache.flink.util.Preconditions;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.utils.ZKPaths;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.data.Stat;
@@ -36,6 +32,7 @@
 import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
+
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
@@ -44,7 +41,6 @@
 import java.util.ConcurrentModificationException;
 import java.util.List;
 import java.util.UUID;
-import java.util.concurrent.Executor;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -68,13 +64,13 @@
  * State handle in ZooKeeper =&gt; State handle exists
  * </pre>
  *
- * But not:
+ * <p>But not:
  *
  * <pre>
  * State handle exists =&gt; State handle in ZooKeeper
  * </pre>
  *
- * There can be lingering state handles when failures happen during operation. They
+ * <p>There can be lingering state handles when failures happen during operation. They
  * need to be cleaned up manually (see <a href="https://issues.apache.org/jira/browse/FLINK-2513">
  * FLINK-2513</a> about a possible way to overcome this).
  *
@@ -84,13 +80,11 @@
 
 	public static Logger LOG = LoggerFactory.getLogger(ZooKeeperStateHandleStore.class);
 
-	/** Curator ZooKeeper client */
+	/** Curator ZooKeeper client. */
 	private final CuratorFramework client;
 
 	private final RetrievableStateStorageHelper<T> storage;
 
-	private final Executor executor;
-
 	/** Lock node name of this ZooKeeperStateHandleStore. The name should be unique among all other state handle stores. */
 	private final String lockNode;
 
@@ -103,16 +97,13 @@
 	 *                            instance, e.g. <code>client.usingNamespace("/stateHandles")</code>
 	 * @param storage to persist the actual state and whose returned state handle is then written
 	 *                to ZooKeeper
-	 * @param executor to run the ZooKeeper callbacks
 	 */
 	public ZooKeeperStateHandleStore(
 		CuratorFramework client,
-		RetrievableStateStorageHelper<T> storage,
-		Executor executor) {
+		RetrievableStateStorageHelper<T> storage) {
 
 		this.client = checkNotNull(client, "Curator client");
 		this.storage = checkNotNull(storage, "State storage");
-		this.executor = checkNotNull(executor);
 
 		// Generate a unique lock node name
 		lockNode = UUID.randomUUID().toString();
@@ -262,7 +253,7 @@ public int exists(String pathInZooKeeper) throws Exception {
 	public Collection<String> getAllPaths() throws Exception {
 		final String path = "/";
 
-		while(true) {
+		while (true) {
 			Stat stat = client.checkExists().forPath(path);
 
 			if (stat == null) {
@@ -393,33 +384,14 @@ public int exists(String pathInZooKeeper) throws Exception {
 
 	/**
 	 * Releases the lock for the given state node and tries to remove the state node if it is no longer locked.
-	 * The deletion of the state node is executed asynchronously.
-	 *
-	 * <p><strong>Important</strong>: This also discards the stored state handle after the given action
-	 * has been executed.
-	 *
-	 * @param pathInZooKeeper Path of state handle to remove (expected to start with a '/')
-	 * @throws Exception If the ZooKeeper operation fails
-	 */
-	public void releaseAndTryRemove(String pathInZooKeeper) throws Exception {
-		releaseAndTryRemove(pathInZooKeeper, null);
-	}
-
-	/**
-	 * Releases the lock for the given state node and tries to remove the state node if it is no longer locked.
-	 * The deletion of the state node is executed asynchronously. After the state node has been deleted, the given
-	 * callback is called with the {@link RetrievableStateHandle} of the deleted state node.
-	 *
-	 * <p><strong>Important</strong>: This also discards the stored state handle after the given action
-	 * has been executed.
+	 * It returns the {@link RetrievableStateHandle} stored under the given state node if any.
 	 *
 	 * @param pathInZooKeeper Path of state handle to remove
-	 * @param callback The callback to execute after a successful deletion. Null if no action needs to be executed.
-	 * @throws Exception If the ZooKeeper operation fails
+	 * @return True if the state handle could be released
+	 * @throws Exception If the ZooKeeper operation or discarding the state handle fails
 	 */
-	public void releaseAndTryRemove(
-			String pathInZooKeeper,
-			@Nullable final RemoveCallback<T> callback) throws Exception {
+	@Nullable
+	public boolean releaseAndTryRemove(String pathInZooKeeper) throws Exception {
 		checkNotNull(pathInZooKeeper, "Path in ZooKeeper");
 
 		final String path = normalizePath(pathInZooKeeper);
@@ -429,14 +401,23 @@ public void releaseAndTryRemove(
 		try {
 			stateHandle = get(path, false);
 		} catch (Exception e) {
-			LOG.warn("Could not retrieve the state handle from node " + path + '.', e);
+			LOG.warn("Could not retrieve the state handle from node {}.", path, e);
 		}
 
 		release(pathInZooKeeper);
 
-		final BackgroundCallback backgroundCallback = new RemoveBackgroundCallback<>(stateHandle, callback, path);
+		try {
+			client.delete().forPath(path);
+		} catch (KeeperException.NotEmptyException ignored) {
+			LOG.debug("Could not delete znode {} because it is still locked.", path);
+			return false;
+		}
+
+		if (stateHandle != null) {
+			stateHandle.discardState();
+		}
 
-		client.delete().inBackground(backgroundCallback, executor).forPath(path);
+		return true;
 	}
 
 	/**
@@ -583,7 +564,7 @@ protected String getLockPath(String rootPath) {
 	}
 
 	/**
-	 * Makes sure that every path starts with a "/"
+	 * Makes sure that every path starts with a "/".
 	 *
 	 * @param path Path to normalize
 	 * @return Normalized path such that it starts with a "/"
@@ -595,103 +576,4 @@ private static String normalizePath(String path) {
 			return '/' + path;
 		}
 	}
-
-	// ---------------------------------------------------------------------------------------------------------
-	// Utility classes
-	// ---------------------------------------------------------------------------------------------------------
-
-	/**
-	 * Callback which is executed when removing a node from ZooKeeper. The callback will call the given
-	 * {@link RemoveCallback} if it is not null. Afterwards, it will discard the given {@link RetrievableStateHandle}
-	 * if it is not null.
-	 *
-	 * @param <T> Type of the value stored in the RetrievableStateHandle
-	 */
-	private static final class RemoveBackgroundCallback<T extends Serializable> implements BackgroundCallback {
-		@Nullable
-		private final RetrievableStateHandle<T> stateHandle;
-
-		@Nullable
-		private final RemoveCallback<T> callback;
-
-		private final String pathInZooKeeper;
-
-		private RemoveBackgroundCallback(
-			@Nullable RetrievableStateHandle<T> stateHandle,
-			@Nullable RemoveCallback<T> callback,
-			String pathInZooKeeper) {
-
-			this.stateHandle = stateHandle;
-			this.callback = callback;
-			this.pathInZooKeeper = Preconditions.checkNotNull(pathInZooKeeper);
-		}
-
-		@Override
-		public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
-			try {
-				if (event.getType() == CuratorEventType.DELETE) {
-					final KeeperException.Code resultCode = KeeperException.Code.get(event.getResultCode());
-
-					if (resultCode == KeeperException.Code.OK) {
-						Exception exception = null;
-
-						if (null != callback) {
-							try {
-								callback.apply(stateHandle);
-							} catch (Throwable e) {
-								exception = new Exception("Could not execute delete action for node " +
-									pathInZooKeeper + '.', e);
-							}
-						}
-
-						if (stateHandle != null) {
-							try {
-								// Discard the state handle
-								stateHandle.discardState();
-							} catch (Throwable e) {
-								Exception newException = new Exception("Could not discard state handle of node " +
-									pathInZooKeeper + '.', e);
-
-								if (exception == null) {
-									exception = newException;
-								} else {
-									exception.addSuppressed(newException);
-								}
-							}
-						}
-
-						if (exception != null) {
-							throw exception;
-						}
-					} else if (resultCode == KeeperException.Code.NOTEMPTY) {
-						// Could not delete the node because it still contains children/locks
-						LOG.debug("Could not delete node " + pathInZooKeeper + " because it is still locked.");
-					} else {
-						throw new IllegalStateException("Unexpected result code " +
-							resultCode.name() + " in '" + event + "' callback.");
-					}
-				} else {
-					throw new IllegalStateException("Unexpected event type " +
-						event.getType() + " in '" + event + "' callback.");
-				}
-			} catch (Exception e) {
-				LOG.warn("Failed to run callback for delete operation on node " + pathInZooKeeper + '.', e);
-			}
-
-		}
-	}
-
-	/**
-	 * Callback interface for remove calls
-	 */
-	public interface RemoveCallback<T extends Serializable> {
-		/**
-		 * Callback method. The parameter can be null if the {@link RetrievableStateHandle} could not be retrieved
-		 * from ZooKeeper.
-		 *
-		 * @param value RetrievableStateHandle retrieved from ZooKeeper, null if it was not retrievable
-		 * @throws FlinkException If the callback failed
-		 */
-		void apply(@Nullable RetrievableStateHandle<T> value) throws FlinkException;
-	}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperUtilityFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperUtilityFactory.java
index d3b7dc5b379..3e294e0dbdd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperUtilityFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperUtilityFactory.java
@@ -18,15 +18,15 @@
 
 package org.apache.flink.runtime.zookeeper;
 
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.recipes.shared.SharedCount;
-import org.apache.curator.framework.recipes.shared.SharedValue;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.util.ZooKeeperUtils;
 import org.apache.flink.util.Preconditions;
 
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.recipes.shared.SharedCount;
+import org.apache.curator.framework.recipes.shared.SharedValue;
+
 import java.io.Serializable;
-import java.util.concurrent.Executor;
 
 /**
  * Creates ZooKeeper utility classes without exposing the {@link CuratorFramework} dependency. The
@@ -71,7 +71,6 @@ public void close(boolean cleanup) throws Exception {
 	 *
 	 * @param zkStateHandleStorePath specifying the path in ZooKeeper to store the state handles to
 	 * @param stateStorageHelper storing the actual state data
-	 * @param executor to run asynchronous callbacks of the state handle store
 	 * @param <T> Type of the state to be stored
 	 * @return a ZooKeeperStateHandleStore instance
 	 * @throws Exception if ZooKeeper could not create the provided state handle store path in
@@ -79,8 +78,7 @@ public void close(boolean cleanup) throws Exception {
 	 */
 	public <T extends Serializable> ZooKeeperStateHandleStore<T> createZooKeeperStateHandleStore(
 			String zkStateHandleStorePath,
-			RetrievableStateStorageHelper<T> stateStorageHelper,
-			Executor executor) throws Exception {
+			RetrievableStateStorageHelper<T> stateStorageHelper) throws Exception {
 
 		facade.newNamespaceAwareEnsurePath(zkStateHandleStorePath).ensure(facade.getZookeeperClient());
 		CuratorFramework stateHandleStoreFacade = facade.usingNamespace(
@@ -88,7 +86,7 @@ public void close(boolean cleanup) throws Exception {
 				facade.getNamespace(),
 				zkStateHandleStorePath));
 
-		return new ZooKeeperStateHandleStore<>(stateHandleStoreFacade, stateStorageHelper, executor);
+		return new ZooKeeperStateHandleStore<>(stateHandleStoreFacade, stateStorageHelper);
 	}
 
 	/**
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 1828baa17df..2000f7d961a 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -1728,21 +1728,22 @@ class JobManager(
     val futureOption = currentJobs.remove(jobID) match {
       case Some((eg, _)) =>
         val cleanUpFuture: Future[Unit] = Future {
-          val cleanupHABlobs = if (removeJobFromStateBackend) {
-            try {
+          val cleanupHABlobs = try {
+            if (removeJobFromStateBackend) {
               // ...otherwise, we can have lingering resources when there is a  concurrent shutdown
               // and the ZooKeeper client is closed. Not removing the job immediately allow the
               // shutdown to release all resources.
               submittedJobGraphs.removeJobGraph(jobID)
               true
-            } catch {
-              case t: Throwable => {
-                log.warn(s"Could not remove submitted job graph $jobID.", t)
-                false
-              }
+            } else {
+              submittedJobGraphs.releaseJobGraph(jobID)
+              false
+            }
+          } catch {
+            case t: Throwable => {
+              log.warn(s"Could not remove submitted job graph $jobID.", t)
+              false
             }
-          } else {
-            false
           }
 
           blobServer.cleanupJob(jobID, cleanupHABlobs)
@@ -1777,19 +1778,23 @@ class JobManager(
     */
   private def cancelAndClearEverything(cause: Throwable)
     : Seq[Future[Unit]] = {
-    val futures = for ((jobID, (eg, jobInfo)) <- currentJobs) yield {
-      future {
-        eg.suspend(cause)
-        jobManagerMetricGroup.removeJob(eg.getJobID)
+
+    val futures = currentJobs.values.flatMap(
+      egJobInfo => {
+        val executionGraph = egJobInfo._1
+        val jobInfo = egJobInfo._2
+
+        executionGraph.suspend(cause)
+
+        val jobId = executionGraph.getJobID
 
         jobInfo.notifyNonDetachedClients(
           decorateMessage(
             Failure(
-              new JobExecutionException(jobID, "All jobs are cancelled and cleared.", cause))))
-      }(context.dispatcher)
-    }
+              new JobExecutionException(jobId, "All jobs are cancelled and cleared.", cause))))
 
-    currentJobs.clear()
+        removeJob(jobId, false)
+      })
 
     futures.toSeq
   }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java
index 81569649663..c4d89030dc3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java
@@ -193,7 +193,7 @@ public void testDiscardAllCheckpoints() throws Exception {
 
 	// ---------------------------------------------------------------------------------------------
 
-	protected TestCompletedCheckpoint createCheckpoint(
+	public static TestCompletedCheckpoint createCheckpoint(
 		int id,
 		SharedStateRegistry sharedStateRegistry) throws IOException {
 
@@ -226,7 +226,12 @@ protected void verifyCheckpointRegistered(Collection<OperatorState> operatorStat
 		}
 	}
 
-	protected void verifyCheckpointDiscarded(Collection<OperatorState> operatorStates) {
+	public static void verifyCheckpointDiscarded(TestCompletedCheckpoint completedCheckpoint) {
+		assertTrue(completedCheckpoint.isDiscarded());
+		verifyCheckpointDiscarded(completedCheckpoint.getOperatorStates().values());
+	}
+
+	protected static void verifyCheckpointDiscarded(Collection<OperatorState> operatorStates) {
 		for (OperatorState operatorState : operatorStates) {
 			for (OperatorSubtaskState subtaskState : operatorState.getStates()) {
 				Assert.assertTrue(((TestOperatorSubtaskState)subtaskState).discarded);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreMockitoTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreMockitoTest.java
new file mode 100644
index 00000000000..1f7d3691e50
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreMockitoTest.java
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.state.RetrievableStateHandle;
+import org.apache.flink.runtime.state.testutils.TestCompletedCheckpointStorageLocation;
+import org.apache.flink.runtime.zookeeper.RetrievableStateStorageHelper;
+import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore;
+import org.apache.flink.util.TestLogger;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.api.BackgroundCallback;
+import org.apache.curator.framework.api.CuratorEvent;
+import org.apache.curator.framework.api.CuratorEventType;
+import org.apache.curator.framework.api.ErrorListenerPathable;
+import org.apache.curator.utils.EnsurePath;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.concurrent.Executor;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import static org.powermock.api.mockito.PowerMockito.doAnswer;
+import static org.powermock.api.mockito.PowerMockito.doThrow;
+import static org.powermock.api.mockito.PowerMockito.whenNew;
+
+/**
+ * Mockito based tests for the {@link ZooKeeperStateHandleStore}.
+ */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(ZooKeeperCompletedCheckpointStore.class)
+public class ZooKeeperCompletedCheckpointStoreMockitoTest extends TestLogger {
+
+	/**
+	 * Tests that the completed checkpoint store can retrieve all checkpoints stored in ZooKeeper
+	 * and ignores those which cannot be retrieved via their state handles.
+	 *
+	 * <p>We have a timeout in case the ZooKeeper store get's into a deadlock/livelock situation.
+	 */
+	@Test(timeout = 50000)
+	public void testCheckpointRecovery() throws Exception {
+		final JobID jobID = new JobID();
+		final long checkpoint1Id = 1L;
+		final long checkpoint2Id = 2;
+		final List<Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String>> checkpointsInZooKeeper = new ArrayList<>(4);
+
+		final Collection<Long> expectedCheckpointIds = new HashSet<>(2);
+		expectedCheckpointIds.add(1L);
+		expectedCheckpointIds.add(2L);
+
+		final RetrievableStateHandle<CompletedCheckpoint> failingRetrievableStateHandle = mock(RetrievableStateHandle.class);
+		when(failingRetrievableStateHandle.retrieveState()).thenThrow(new IOException("Test exception"));
+
+		final RetrievableStateHandle<CompletedCheckpoint> retrievableStateHandle1 = mock(RetrievableStateHandle.class);
+		when(retrievableStateHandle1.retrieveState()).then(
+			(invocation) -> new CompletedCheckpoint(
+				jobID,
+				checkpoint1Id,
+				1L,
+				1L,
+				new HashMap<>(),
+				null,
+				CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION),
+				new TestCompletedCheckpointStorageLocation()));
+
+		final RetrievableStateHandle<CompletedCheckpoint> retrievableStateHandle2 = mock(RetrievableStateHandle.class);
+		when(retrievableStateHandle2.retrieveState()).then(
+			(invocation -> new CompletedCheckpoint(
+				jobID,
+				checkpoint2Id,
+				2L,
+				2L,
+				new HashMap<>(),
+				null,
+				CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION),
+				new TestCompletedCheckpointStorageLocation())));
+
+		checkpointsInZooKeeper.add(Tuple2.of(retrievableStateHandle1, "/foobar1"));
+		checkpointsInZooKeeper.add(Tuple2.of(failingRetrievableStateHandle, "/failing1"));
+		checkpointsInZooKeeper.add(Tuple2.of(retrievableStateHandle2, "/foobar2"));
+		checkpointsInZooKeeper.add(Tuple2.of(failingRetrievableStateHandle, "/failing2"));
+
+		final CuratorFramework client = mock(CuratorFramework.class, Mockito.RETURNS_DEEP_STUBS);
+		final RetrievableStateStorageHelper<CompletedCheckpoint> storageHelperMock = mock(RetrievableStateStorageHelper.class);
+
+		ZooKeeperStateHandleStore<CompletedCheckpoint> zooKeeperStateHandleStoreMock = spy(new ZooKeeperStateHandleStore<>(client, storageHelperMock));
+		whenNew(ZooKeeperStateHandleStore.class).withAnyArguments().thenReturn(zooKeeperStateHandleStoreMock);
+		doReturn(checkpointsInZooKeeper).when(zooKeeperStateHandleStoreMock).getAllSortedByNameAndLock();
+
+		final int numCheckpointsToRetain = 1;
+
+		// Mocking for the delete operation on the CuratorFramework client
+		// It assures that the callback is executed synchronously
+
+		final EnsurePath ensurePathMock = mock(EnsurePath.class);
+		final CuratorEvent curatorEventMock = mock(CuratorEvent.class);
+		when(curatorEventMock.getType()).thenReturn(CuratorEventType.DELETE);
+		when(curatorEventMock.getResultCode()).thenReturn(0);
+		when(client.newNamespaceAwareEnsurePath(anyString())).thenReturn(ensurePathMock);
+
+		when(
+			client
+				.delete()
+				.inBackground(any(BackgroundCallback.class), any(Executor.class))
+		).thenAnswer(new Answer<ErrorListenerPathable<Void>>() {
+			@Override
+			public ErrorListenerPathable<Void> answer(InvocationOnMock invocation) throws Throwable {
+				final BackgroundCallback callback = (BackgroundCallback) invocation.getArguments()[0];
+
+				ErrorListenerPathable<Void> result = mock(ErrorListenerPathable.class);
+
+				when(result.forPath(anyString())).thenAnswer(new Answer<Void>() {
+					@Override
+					public Void answer(InvocationOnMock invocation) throws Throwable {
+
+						callback.processResult(client, curatorEventMock);
+
+						return null;
+					}
+				});
+
+				return result;
+			}
+		});
+
+		final String checkpointsPath = "foobar";
+		final RetrievableStateStorageHelper<CompletedCheckpoint> stateStorage = mock(RetrievableStateStorageHelper.class);
+
+		ZooKeeperCompletedCheckpointStore zooKeeperCompletedCheckpointStore = new ZooKeeperCompletedCheckpointStore(
+			numCheckpointsToRetain,
+			client,
+			checkpointsPath,
+			stateStorage,
+			Executors.directExecutor());
+
+		zooKeeperCompletedCheckpointStore.recover();
+
+		CompletedCheckpoint latestCompletedCheckpoint = zooKeeperCompletedCheckpointStore.getLatestCheckpoint();
+
+		// check that we return the latest retrievable checkpoint
+		// this should remove the latest checkpoint because it is broken
+		assertEquals(checkpoint2Id, latestCompletedCheckpoint.getCheckpointID());
+
+		// this should remove the second broken checkpoint because we're iterating over all checkpoints
+		List<CompletedCheckpoint> completedCheckpoints = zooKeeperCompletedCheckpointStore.getAllCheckpoints();
+
+		Collection<Long> actualCheckpointIds = new HashSet<>(completedCheckpoints.size());
+
+		for (CompletedCheckpoint completedCheckpoint : completedCheckpoints) {
+			actualCheckpointIds.add(completedCheckpoint.getCheckpointID());
+		}
+
+		assertEquals(expectedCheckpointIds, actualCheckpointIds);
+
+		// check that we did not discard any of the state handles
+		verify(retrievableStateHandle1, never()).discardState();
+		verify(retrievableStateHandle2, never()).discardState();
+
+		// Make sure that we also didn't discard any of the broken handles. Only when checkpoints
+		// are subsumed should they be discarded.
+		verify(failingRetrievableStateHandle, never()).discardState();
+	}
+
+	/**
+	 * Tests that the checkpoint does not exist in the store when we fail to add
+	 * it into the store (i.e., there exists an exception thrown by the method).
+	 */
+	@Test
+	public void testAddCheckpointWithFailedRemove() throws Exception {
+		final CuratorFramework client = mock(CuratorFramework.class, Mockito.RETURNS_DEEP_STUBS);
+		final RetrievableStateStorageHelper<CompletedCheckpoint> storageHelperMock = mock(RetrievableStateStorageHelper.class);
+
+		ZooKeeperStateHandleStore<CompletedCheckpoint> zookeeperStateHandleStoreMock =
+			spy(new ZooKeeperStateHandleStore<>(client, storageHelperMock));
+		whenNew(ZooKeeperStateHandleStore.class).withAnyArguments().thenReturn(zookeeperStateHandleStoreMock);
+
+		doAnswer(new Answer<RetrievableStateHandle<CompletedCheckpoint>>() {
+			@Override
+			public RetrievableStateHandle<CompletedCheckpoint> answer(InvocationOnMock invocationOnMock) throws Throwable {
+				CompletedCheckpoint checkpoint = (CompletedCheckpoint) invocationOnMock.getArguments()[1];
+
+				RetrievableStateHandle<CompletedCheckpoint> retrievableStateHandle = mock(RetrievableStateHandle.class);
+				when(retrievableStateHandle.retrieveState()).thenReturn(checkpoint);
+
+				return retrievableStateHandle;
+			}
+		}).when(zookeeperStateHandleStoreMock).addAndLock(anyString(), any(CompletedCheckpoint.class));
+
+		doThrow(new Exception()).when(zookeeperStateHandleStoreMock).releaseAndTryRemove(anyString());
+
+		final int numCheckpointsToRetain = 1;
+		final String checkpointsPath = "foobar";
+		final RetrievableStateStorageHelper<CompletedCheckpoint> stateSotrage = mock(RetrievableStateStorageHelper.class);
+
+		ZooKeeperCompletedCheckpointStore zooKeeperCompletedCheckpointStore = new ZooKeeperCompletedCheckpointStore(
+			numCheckpointsToRetain,
+			client,
+			checkpointsPath,
+			stateSotrage,
+			Executors.directExecutor());
+
+		for (long i = 0; i <= numCheckpointsToRetain; ++i) {
+			CompletedCheckpoint checkpointToAdd = mock(CompletedCheckpoint.class);
+			doReturn(i).when(checkpointToAdd).getCheckpointID();
+			doReturn(Collections.emptyMap()).when(checkpointToAdd).getOperatorStates();
+
+			try {
+				zooKeeperCompletedCheckpointStore.addCheckpoint(checkpointToAdd);
+
+				// The checkpoint should be in the store if we successfully add it into the store.
+				List<CompletedCheckpoint> addedCheckpoints = zooKeeperCompletedCheckpointStore.getAllCheckpoints();
+				assertTrue(addedCheckpoints.contains(checkpointToAdd));
+			} catch (Exception e) {
+				// The checkpoint should not be in the store if any exception is thrown.
+				List<CompletedCheckpoint> addedCheckpoints = zooKeeperCompletedCheckpointStore.getAllCheckpoints();
+				assertFalse(addedCheckpoints.contains(checkpointToAdd));
+			}
+		}
+	}
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java
index 0384733fdb1..f992d3b00c0 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java
@@ -18,60 +18,39 @@
 
 package org.apache.flink.runtime.checkpoint;
 
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.state.RetrievableStateHandle;
-import org.apache.flink.runtime.state.testutils.TestCompletedCheckpointStorageLocation;
+import org.apache.flink.runtime.state.SharedStateRegistry;
+import org.apache.flink.runtime.util.ZooKeeperUtils;
 import org.apache.flink.runtime.zookeeper.RetrievableStateStorageHelper;
-import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore;
+import org.apache.flink.runtime.zookeeper.ZooKeeperResource;
 import org.apache.flink.util.TestLogger;
 
 import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.api.BackgroundCallback;
-import org.apache.curator.framework.api.CuratorEvent;
-import org.apache.curator.framework.api.CuratorEventType;
-import org.apache.curator.framework.api.ErrorListenerPathable;
-import org.apache.curator.utils.EnsurePath;
+import org.hamcrest.Matchers;
+import org.junit.ClassRule;
 import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.mockito.Mockito;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
+
+import javax.annotation.Nonnull;
 
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
+import java.io.Serializable;
 import java.util.List;
-import java.util.concurrent.Executor;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyString;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-import static org.powermock.api.mockito.PowerMockito.doAnswer;
-import static org.powermock.api.mockito.PowerMockito.doThrow;
-import static org.powermock.api.mockito.PowerMockito.whenNew;
+import static org.junit.Assert.assertThat;
 
 /**
  * Tests for {@link ZooKeeperCompletedCheckpointStore}.
  */
-@RunWith(PowerMockRunner.class)
-@PrepareForTest(ZooKeeperCompletedCheckpointStore.class)
 public class ZooKeeperCompletedCheckpointStoreTest extends TestLogger {
 
+	@ClassRule
+	public static ZooKeeperResource zooKeeperResource = new ZooKeeperResource();
+
 	@Test
 	public void testPathConversion() {
 		final long checkpointId = 42L;
@@ -82,188 +61,103 @@ public void testPathConversion() {
 	}
 
 	/**
-	 * Tests that the completed checkpoint store can retrieve all checkpoints stored in ZooKeeper
-	 * and ignores those which cannot be retrieved via their state handles.
-	 *
-	 * <p>We have a timeout in case the ZooKeeper store get's into a deadlock/livelock situation.
+	 * Tests that subsumed checkpoints are discarded.
 	 */
-	@Test(timeout = 50000)
-	public void testCheckpointRecovery() throws Exception {
-		final JobID jobID = new JobID();
-		final long checkpoint1Id = 1L;
-		final long checkpoint2Id = 2;
-		final List<Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String>> checkpointsInZooKeeper = new ArrayList<>(4);
-
-		final Collection<Long> expectedCheckpointIds = new HashSet<>(2);
-		expectedCheckpointIds.add(1L);
-		expectedCheckpointIds.add(2L);
-
-		final RetrievableStateHandle<CompletedCheckpoint> failingRetrievableStateHandle = mock(RetrievableStateHandle.class);
-		when(failingRetrievableStateHandle.retrieveState()).thenThrow(new IOException("Test exception"));
-
-		final RetrievableStateHandle<CompletedCheckpoint> retrievableStateHandle1 = mock(RetrievableStateHandle.class);
-		when(retrievableStateHandle1.retrieveState()).then(
-			(invocation) -> new CompletedCheckpoint(
-				jobID,
-				checkpoint1Id,
-				1L,
-				1L,
-				new HashMap<>(),
-				null,
-				CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION),
-				new TestCompletedCheckpointStorageLocation()));
-
-		final RetrievableStateHandle<CompletedCheckpoint> retrievableStateHandle2 = mock(RetrievableStateHandle.class);
-		when(retrievableStateHandle2.retrieveState()).then(
-			(invocation -> new CompletedCheckpoint(
-				jobID,
-				checkpoint2Id,
-				2L,
-				2L,
-				new HashMap<>(),
-				null,
-				CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION),
-				new TestCompletedCheckpointStorageLocation())));
-
-		checkpointsInZooKeeper.add(Tuple2.of(retrievableStateHandle1, "/foobar1"));
-		checkpointsInZooKeeper.add(Tuple2.of(failingRetrievableStateHandle, "/failing1"));
-		checkpointsInZooKeeper.add(Tuple2.of(retrievableStateHandle2, "/foobar2"));
-		checkpointsInZooKeeper.add(Tuple2.of(failingRetrievableStateHandle, "/failing2"));
-
-		final CuratorFramework client = mock(CuratorFramework.class, Mockito.RETURNS_DEEP_STUBS);
-		final RetrievableStateStorageHelper<CompletedCheckpoint> storageHelperMock = mock(RetrievableStateStorageHelper.class);
-
-		ZooKeeperStateHandleStore<CompletedCheckpoint> zooKeeperStateHandleStoreMock = spy(new ZooKeeperStateHandleStore<>(client, storageHelperMock, Executors.directExecutor()));
-		whenNew(ZooKeeperStateHandleStore.class).withAnyArguments().thenReturn(zooKeeperStateHandleStoreMock);
-		doReturn(checkpointsInZooKeeper).when(zooKeeperStateHandleStoreMock).getAllSortedByNameAndLock();
-
-		final int numCheckpointsToRetain = 1;
-
-		// Mocking for the delete operation on the CuratorFramework client
-		// It assures that the callback is executed synchronously
-
-		final EnsurePath ensurePathMock = mock(EnsurePath.class);
-		final CuratorEvent curatorEventMock = mock(CuratorEvent.class);
-		when(curatorEventMock.getType()).thenReturn(CuratorEventType.DELETE);
-		when(curatorEventMock.getResultCode()).thenReturn(0);
-		when(client.newNamespaceAwareEnsurePath(anyString())).thenReturn(ensurePathMock);
-
-		when(
-			client
-				.delete()
-				.inBackground(any(BackgroundCallback.class), any(Executor.class))
-		).thenAnswer(new Answer<ErrorListenerPathable<Void>>() {
-			@Override
-			public ErrorListenerPathable<Void> answer(InvocationOnMock invocation) throws Throwable {
-				final BackgroundCallback callback = (BackgroundCallback) invocation.getArguments()[0];
+	@Test
+	public void testDiscardingSubsumedCheckpoints() throws Exception {
+		final SharedStateRegistry sharedStateRegistry = new SharedStateRegistry();
+		final Configuration configuration = new Configuration();
+		configuration.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, zooKeeperResource.getConnectString());
+
+		final CuratorFramework client = ZooKeeperUtils.startCuratorFramework(configuration);
+		final ZooKeeperCompletedCheckpointStore checkpointStore = createZooKeeperCheckpointStore(client);
+
+		try {
+			final CompletedCheckpointStoreTest.TestCompletedCheckpoint checkpoint1 = CompletedCheckpointStoreTest.createCheckpoint(0, sharedStateRegistry);
+
+			checkpointStore.addCheckpoint(checkpoint1);
+			assertThat(checkpointStore.getAllCheckpoints(), Matchers.contains(checkpoint1));
+
+			final CompletedCheckpointStoreTest.TestCompletedCheckpoint checkpoint2 = CompletedCheckpointStoreTest.createCheckpoint(1, sharedStateRegistry);
+			checkpointStore.addCheckpoint(checkpoint2);
+			final List<CompletedCheckpoint> allCheckpoints = checkpointStore.getAllCheckpoints();
+			assertThat(allCheckpoints, Matchers.contains(checkpoint2));
+			assertThat(allCheckpoints, Matchers.not(Matchers.contains(checkpoint1)));
+
+			// verify that the subsumed checkpoint is discarded
+			CompletedCheckpointStoreTest.verifyCheckpointDiscarded(checkpoint1);
+		} finally {
+			client.close();
+		}
+	}
 
-				ErrorListenerPathable<Void> result = mock(ErrorListenerPathable.class);
+	/**
+	 * Tests that checkpoints are discarded when the completed checkpoint store is shut
+	 * down with a globally terminal state.
+	 */
+	@Test
+	public void testDiscardingCheckpointsAtShutDown() throws Exception {
+		final SharedStateRegistry sharedStateRegistry = new SharedStateRegistry();
+		final Configuration configuration = new Configuration();
+		configuration.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, zooKeeperResource.getConnectString());
 
-				when(result.forPath(anyString())).thenAnswer(new Answer<Void>() {
-					@Override
-					public Void answer(InvocationOnMock invocation) throws Throwable {
+		final CuratorFramework client = ZooKeeperUtils.startCuratorFramework(configuration);
+		final ZooKeeperCompletedCheckpointStore checkpointStore = createZooKeeperCheckpointStore(client);
 
-						callback.processResult(client, curatorEventMock);
+		try {
+			final CompletedCheckpointStoreTest.TestCompletedCheckpoint checkpoint1 = CompletedCheckpointStoreTest.createCheckpoint(0, sharedStateRegistry);
 
-						return null;
-					}
-				});
+			checkpointStore.addCheckpoint(checkpoint1);
+			assertThat(checkpointStore.getAllCheckpoints(), Matchers.contains(checkpoint1));
 
-				return result;
-			}
-		});
+			checkpointStore.shutdown(JobStatus.FINISHED);
 
-		final String checkpointsPath = "foobar";
-		final RetrievableStateStorageHelper<CompletedCheckpoint> stateStorage = mock(RetrievableStateStorageHelper.class);
+			// verify that the checkpoint is discarded
+			CompletedCheckpointStoreTest.verifyCheckpointDiscarded(checkpoint1);
+		} finally {
+			client.close();
+		}
+	}
 
-		ZooKeeperCompletedCheckpointStore zooKeeperCompletedCheckpointStore = new ZooKeeperCompletedCheckpointStore(
-			numCheckpointsToRetain,
+	@Nonnull
+	private ZooKeeperCompletedCheckpointStore createZooKeeperCheckpointStore(CuratorFramework client) throws Exception {
+		return new ZooKeeperCompletedCheckpointStore(
+			1,
 			client,
-			checkpointsPath,
-			stateStorage,
+			"/checkpoints",
+			new TestingRetrievableStateStorageHelper<>(),
 			Executors.directExecutor());
+	}
 
-		zooKeeperCompletedCheckpointStore.recover();
-
-		CompletedCheckpoint latestCompletedCheckpoint = zooKeeperCompletedCheckpointStore.getLatestCheckpoint();
-
-		// check that we return the latest retrievable checkpoint
-		// this should remove the latest checkpoint because it is broken
-		assertEquals(checkpoint2Id, latestCompletedCheckpoint.getCheckpointID());
-
-		// this should remove the second broken checkpoint because we're iterating over all checkpoints
-		List<CompletedCheckpoint> completedCheckpoints = zooKeeperCompletedCheckpointStore.getAllCheckpoints();
-
-		Collection<Long> actualCheckpointIds = new HashSet<>(completedCheckpoints.size());
-
-		for (CompletedCheckpoint completedCheckpoint : completedCheckpoints) {
-			actualCheckpointIds.add(completedCheckpoint.getCheckpointID());
+	private static final class TestingRetrievableStateStorageHelper<T extends Serializable> implements RetrievableStateStorageHelper<T> {
+		@Override
+		public RetrievableStateHandle<T> store(T state) {
+			return new TestingRetrievableStateHandle<>(state);
 		}
 
-		assertEquals(expectedCheckpointIds, actualCheckpointIds);
-
-		// check that we did not discard any of the state handles
-		verify(retrievableStateHandle1, never()).discardState();
-		verify(retrievableStateHandle2, never()).discardState();
+		private static class TestingRetrievableStateHandle<T extends Serializable> implements RetrievableStateHandle<T> {
 
-		// Make sure that we also didn't discard any of the broken handles. Only when checkpoints
-		// are subsumed should they be discarded.
-		verify(failingRetrievableStateHandle, never()).discardState();
-	}
+			private static final long serialVersionUID = 137053380713794300L;
 
-	/**
-	 * Tests that the checkpoint does not exist in the store when we fail to add
-	 * it into the store (i.e., there exists an exception thrown by the method).
-	 */
-	@Test
-	public void testAddCheckpointWithFailedRemove() throws Exception {
-		final CuratorFramework client = mock(CuratorFramework.class, Mockito.RETURNS_DEEP_STUBS);
-		final RetrievableStateStorageHelper<CompletedCheckpoint> storageHelperMock = mock(RetrievableStateStorageHelper.class);
+			private final T state;
 
-		ZooKeeperStateHandleStore<CompletedCheckpoint> zookeeperStateHandleStoreMock =
-			spy(new ZooKeeperStateHandleStore<>(client, storageHelperMock, Executors.directExecutor()));
-		whenNew(ZooKeeperStateHandleStore.class).withAnyArguments().thenReturn(zookeeperStateHandleStoreMock);
+			private TestingRetrievableStateHandle(T state) {
+				this.state = state;
+			}
 
-		doAnswer(new Answer<RetrievableStateHandle<CompletedCheckpoint>>() {
 			@Override
-			public RetrievableStateHandle<CompletedCheckpoint> answer(InvocationOnMock invocationOnMock) throws Throwable {
-				CompletedCheckpoint checkpoint = (CompletedCheckpoint) invocationOnMock.getArguments()[1];
-
-				RetrievableStateHandle<CompletedCheckpoint> retrievableStateHandle = mock(RetrievableStateHandle.class);
-				when(retrievableStateHandle.retrieveState()).thenReturn(checkpoint);
-
-				return retrievableStateHandle;
+			public T retrieveState() throws IOException, ClassNotFoundException {
+				return state;
 			}
-		}).when(zookeeperStateHandleStoreMock).addAndLock(anyString(), any(CompletedCheckpoint.class));
-
-		doThrow(new Exception()).when(zookeeperStateHandleStoreMock).releaseAndTryRemove(anyString(), any(ZooKeeperStateHandleStore.RemoveCallback.class));
 
-		final int numCheckpointsToRetain = 1;
-		final String checkpointsPath = "foobar";
-		final RetrievableStateStorageHelper<CompletedCheckpoint> stateSotrage = mock(RetrievableStateStorageHelper.class);
-
-		ZooKeeperCompletedCheckpointStore zooKeeperCompletedCheckpointStore = new ZooKeeperCompletedCheckpointStore(
-			numCheckpointsToRetain,
-			client,
-			checkpointsPath,
-			stateSotrage,
-			Executors.directExecutor());
+			@Override
+			public void discardState() throws Exception {
+				// no op
+			}
 
-		for (long i = 0; i <= numCheckpointsToRetain; ++i) {
-			CompletedCheckpoint checkpointToAdd = mock(CompletedCheckpoint.class);
-			doReturn(i).when(checkpointToAdd).getCheckpointID();
-			doReturn(Collections.emptyMap()).when(checkpointToAdd).getOperatorStates();
-
-			try {
-				zooKeeperCompletedCheckpointStore.addCheckpoint(checkpointToAdd);
-
-				// The checkpoint should be in the store if we successfully add it into the store.
-				List<CompletedCheckpoint> addedCheckpoints = zooKeeperCompletedCheckpointStore.getAllCheckpoints();
-				assertTrue(addedCheckpoints.contains(checkpointToAdd));
-			} catch (Exception e) {
-				// The checkpoint should not be in the store if any exception is thrown.
-				List<CompletedCheckpoint> addedCheckpoints = zooKeeperCompletedCheckpointStore.getAllCheckpoints();
-				assertFalse(addedCheckpoints.contains(checkpointToAdd));
+			@Override
+			public long getStateSize() {
+				return 0;
 			}
 		}
 	}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherHATest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherHATest.java
index 2c030d24c49..cb26f4862b1 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherHATest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherHATest.java
@@ -30,9 +30,11 @@
 import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobmanager.StandaloneSubmittedJobGraphStore;
 import org.apache.flink.runtime.jobmanager.SubmittedJobGraph;
 import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore;
 import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
+import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
 import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
@@ -41,6 +43,7 @@
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.rpc.RpcUtils;
 import org.apache.flink.runtime.rpc.TestingRpcService;
+import org.apache.flink.runtime.testtasks.NoOpInvokable;
 import org.apache.flink.runtime.util.TestingFatalErrorHandler;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.TestLogger;
@@ -110,8 +113,6 @@ public static void teardownClass() throws ExecutionException, InterruptedExcepti
 	 */
 	@Test
 	public void testGrantingRevokingLeadership() throws Exception {
-
-		final Configuration configuration = new Configuration();
 		final TestingHighAvailabilityServices highAvailabilityServices = new TestingHighAvailabilityServices();
 		final JobGraph nonEmptyJobGraph = createNonEmptyJobGraph();
 		final SubmittedJobGraph submittedJobGraph = new SubmittedJobGraph(nonEmptyJobGraph, null);
@@ -124,7 +125,34 @@ public void testGrantingRevokingLeadership() throws Exception {
 
 		final BlockingQueue<DispatcherId> fencingTokens = new ArrayBlockingQueue<>(2);
 
-		final HATestingDispatcher dispatcher = new HATestingDispatcher(
+		final HATestingDispatcher dispatcher = createHADispatcher(highAvailabilityServices, fencingTokens);
+
+		dispatcher.start();
+
+		try {
+			final UUID leaderId = UUID.randomUUID();
+			dispatcherLeaderElectionService.isLeader(leaderId);
+
+			dispatcherLeaderElectionService.notLeader();
+
+			final DispatcherId firstFencingToken = fencingTokens.take();
+
+			assertThat(firstFencingToken, equalTo(NULL_FENCING_TOKEN));
+
+			enterGetJobIdsLatch.await();
+			proceedGetJobIdsLatch.trigger();
+
+			assertThat(dispatcher.getNumberJobs(timeout).get(), is(0));
+
+		} finally {
+			RpcUtils.terminateRpcEndpoint(dispatcher, timeout);
+		}
+	}
+
+	@Nonnull
+	private HATestingDispatcher createHADispatcher(TestingHighAvailabilityServices highAvailabilityServices, BlockingQueue<DispatcherId> fencingTokens) throws Exception {
+		final Configuration configuration = new Configuration();
+		return new HATestingDispatcher(
 			rpcService,
 			UUID.randomUUID().toString(),
 			configuration,
@@ -138,33 +166,63 @@ public void testGrantingRevokingLeadership() throws Exception {
 			new TestingJobManagerRunnerFactory(new CompletableFuture<>(), new CompletableFuture<>()),
 			testingFatalErrorHandler,
 			fencingTokens);
+	}
+
+	/**
+	 * Tests that all JobManagerRunner are terminated if the leadership of the
+	 * Dispatcher is revoked.
+	 */
+	@Test
+	public void testRevokeLeadershipTerminatesJobManagerRunners() throws Exception {
+
+		final TestingHighAvailabilityServices highAvailabilityServices = new TestingHighAvailabilityServices();
+		highAvailabilityServices.setSubmittedJobGraphStore(new StandaloneSubmittedJobGraphStore());
+
+		final TestingLeaderElectionService leaderElectionService = new TestingLeaderElectionService();
+		highAvailabilityServices.setDispatcherLeaderElectionService(leaderElectionService);
+
+		final ArrayBlockingQueue<DispatcherId> fencingTokens = new ArrayBlockingQueue<>(2);
+		final HATestingDispatcher dispatcher = createHADispatcher(
+			highAvailabilityServices,
+			fencingTokens);
 
 		dispatcher.start();
 
 		try {
-			final UUID leaderId = UUID.randomUUID();
-			dispatcherLeaderElectionService.isLeader(leaderId);
+			// grant leadership and submit a single job
+			final DispatcherId expectedDispatcherId = DispatcherId.generate();
 
-			dispatcherLeaderElectionService.notLeader();
+			leaderElectionService.isLeader(expectedDispatcherId.toUUID()).get();
 
-			final DispatcherId firstFencingToken = fencingTokens.take();
+			assertThat(fencingTokens.take(), is(equalTo(expectedDispatcherId)));
 
-			assertThat(firstFencingToken, equalTo(NULL_FENCING_TOKEN));
+			final DispatcherGateway dispatcherGateway = dispatcher.getSelfGateway(DispatcherGateway.class);
 
-			enterGetJobIdsLatch.await();
-			proceedGetJobIdsLatch.trigger();
+			final CompletableFuture<Acknowledge> submissionFuture = dispatcherGateway.submitJob(createNonEmptyJobGraph(), timeout);
 
-			assertThat(dispatcher.getNumberJobs(timeout).get(), is(0));
+			submissionFuture.get();
+
+			assertThat(dispatcher.getNumberJobs(timeout).get(), is(1));
+
+			// revoke the leadership --> this should stop all running JobManagerRunners
+			leaderElectionService.notLeader();
+
+			assertThat(fencingTokens.take(), is(equalTo(NULL_FENCING_TOKEN)));
 
+			assertThat(dispatcher.getNumberJobs(timeout).get(), is(0));
 		} finally {
 			RpcUtils.terminateRpcEndpoint(dispatcher, timeout);
 		}
 	}
 
 	@Nonnull
-	private JobGraph createNonEmptyJobGraph() {
+	public static JobGraph createNonEmptyJobGraph() {
 		final JobVertex noOpVertex = new JobVertex("NoOp vertex");
-		return new JobGraph(noOpVertex);
+		noOpVertex.setInvokableClass(NoOpInvokable.class);
+		final JobGraph jobGraph = new JobGraph(noOpVertex);
+		jobGraph.setAllowQueuedScheduling(true);
+
+		return jobGraph;
 	}
 
 	private static class HATestingDispatcher extends TestingDispatcher {
@@ -243,6 +301,11 @@ public void removeJobGraph(JobID jobId) throws Exception {
 			throw new UnsupportedOperationException("Should not be called.");
 		}
 
+		@Override
+		public void releaseJobGraph(JobID jobId) throws Exception {
+			throw new UnsupportedOperationException("Should not be called.");
+		}
+
 		@Override
 		public Collection<JobID> getJobIds() throws Exception {
 			enterGetJobIdsLatch.trigger();
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/NoOpSubmittedJobGraphListener.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/NoOpSubmittedJobGraphListener.java
new file mode 100644
index 00000000000..493534dd93c
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/NoOpSubmittedJobGraphListener.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore;
+
+/**
+ * No operation {@link org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore.SubmittedJobGraphListener}
+ * implemetation for testing purposes.
+ */
+public enum NoOpSubmittedJobGraphListener implements SubmittedJobGraphStore.SubmittedJobGraphListener {
+	INSTANCE;
+
+	@Override
+	public void onAddedJobGraph(JobID jobId) {
+		// No op
+	}
+
+	@Override
+	public void onRemovedJobGraph(JobID jobId) {
+		// No op
+	}
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingDispatcher.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingDispatcher.java
index f5091ea5b10..5141be039f7 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingDispatcher.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingDispatcher.java
@@ -19,6 +19,8 @@
 package org.apache.flink.runtime.dispatcher;
 
 import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.blob.BlobServer;
 import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
@@ -29,8 +31,12 @@
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
 
+import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
+
 /**
  * {@link Dispatcher} implementation used for testing purposes.
  */
@@ -72,4 +78,11 @@ void completeJobExecution(ArchivedExecutionGraph archivedExecutionGraph) {
 		runAsync(
 			() -> jobReachedGloballyTerminalState(archivedExecutionGraph));
 	}
+
+	@VisibleForTesting
+	public CompletableFuture<Void> getJobTerminationFuture(@Nonnull JobID jobId, @Nonnull Time timeout) {
+		return callAsyncWithoutFencing(
+			() -> getJobTerminationFuture(jobId),
+			timeout).thenCompose(Function.identity());
+	}
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/ZooKeeperHADispatcherTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/ZooKeeperHADispatcherTest.java
new file mode 100644
index 00000000000..dd0375886a9
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/ZooKeeperHADispatcherTest.java
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.runtime.blob.BlobServer;
+import org.apache.flink.runtime.blob.VoidBlobStore;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobmanager.SubmittedJobGraph;
+import org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore;
+import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
+import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway;
+import org.apache.flink.runtime.rpc.RpcUtils;
+import org.apache.flink.runtime.rpc.TestingRpcService;
+import org.apache.flink.runtime.util.TestingFatalErrorHandler;
+import org.apache.flink.runtime.util.ZooKeeperUtils;
+import org.apache.flink.runtime.zookeeper.ZooKeeperResource;
+import org.apache.flink.util.TestLogger;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.hamcrest.Matchers;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.rules.TestName;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.concurrent.CompletableFuture;
+
+import static org.junit.Assert.assertThat;
+
+/**
+ * Test cases for the interaction between ZooKeeper HA and the {@link Dispatcher}.
+ */
+public class ZooKeeperHADispatcherTest extends TestLogger {
+
+	private static final Time TIMEOUT = Time.seconds(10L);
+
+	@ClassRule
+	public static final ZooKeeperResource ZOO_KEEPER_RESOURCE = new ZooKeeperResource();
+
+	@ClassRule
+	public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();
+
+	private static Configuration configuration;
+
+	private static TestingRpcService rpcService;
+
+	private static BlobServer blobServer;
+
+	@Rule
+	public TestName name = new TestName();
+
+	private TestingFatalErrorHandler testingFatalErrorHandler;
+
+	@BeforeClass
+	public static void setupClass() throws IOException {
+		configuration = new Configuration();
+		configuration.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, ZOO_KEEPER_RESOURCE.getConnectString());
+		configuration.setString(HighAvailabilityOptions.HA_STORAGE_PATH, TEMPORARY_FOLDER.newFolder().getAbsolutePath());
+		rpcService = new TestingRpcService();
+		blobServer = new BlobServer(configuration, new VoidBlobStore());
+	}
+
+	@Before
+	public void setup() {
+		testingFatalErrorHandler = new TestingFatalErrorHandler();
+	}
+
+	@After
+	public void teardown() throws Exception {
+		if (testingFatalErrorHandler != null) {
+			testingFatalErrorHandler.rethrowError();
+		}
+	}
+
+	@AfterClass
+	public static void teardownClass() throws Exception {
+		if (rpcService != null) {
+			RpcUtils.terminateRpcService(rpcService, TIMEOUT);
+			rpcService = null;
+		}
+
+		if (blobServer != null) {
+			blobServer.close();
+			blobServer = null;
+		}
+	}
+
+	/**
+	 * Tests that the {@link Dispatcher} releases a locked {@link SubmittedJobGraph} if it
+	 * lost the leadership.
+	 */
+	@Test
+	public void testSubmittedJobGraphRelease() throws Exception {
+		final CuratorFramework client = ZooKeeperUtils.startCuratorFramework(configuration);
+		final CuratorFramework otherClient = ZooKeeperUtils.startCuratorFramework(configuration);
+
+		try (final TestingHighAvailabilityServices testingHighAvailabilityServices = new TestingHighAvailabilityServices()) {
+			testingHighAvailabilityServices.setSubmittedJobGraphStore(ZooKeeperUtils.createSubmittedJobGraphs(client, configuration));
+
+			final ZooKeeperSubmittedJobGraphStore otherSubmittedJobGraphStore = ZooKeeperUtils.createSubmittedJobGraphs(
+				otherClient,
+				configuration);
+
+			otherSubmittedJobGraphStore.start(NoOpSubmittedJobGraphListener.INSTANCE);
+
+			final TestingLeaderElectionService leaderElectionService = new TestingLeaderElectionService();
+			testingHighAvailabilityServices.setDispatcherLeaderElectionService(leaderElectionService);
+
+			final TestingDispatcher dispatcher = createDispatcher(testingHighAvailabilityServices);
+
+			dispatcher.start();
+
+			try {
+				final DispatcherId expectedLeaderId = DispatcherId.generate();
+				leaderElectionService.isLeader(expectedLeaderId.toUUID()).get();
+
+				final DispatcherGateway dispatcherGateway = dispatcher.getSelfGateway(DispatcherGateway.class);
+
+				final JobGraph nonEmptyJobGraph = DispatcherHATest.createNonEmptyJobGraph();
+				final CompletableFuture<Acknowledge> submissionFuture = dispatcherGateway.submitJob(nonEmptyJobGraph, TIMEOUT);
+				submissionFuture.get();
+
+				Collection<JobID> jobIds = otherSubmittedJobGraphStore.getJobIds();
+
+				final JobID jobId = nonEmptyJobGraph.getJobID();
+				assertThat(jobIds, Matchers.contains(jobId));
+
+				leaderElectionService.notLeader();
+
+				// wait for the job to properly terminate
+				final CompletableFuture<Void> jobTerminationFuture = dispatcher.getJobTerminationFuture(jobId, TIMEOUT);
+				jobTerminationFuture.get();
+
+				// recover the job
+				final SubmittedJobGraph submittedJobGraph = otherSubmittedJobGraphStore.recoverJobGraph(jobId);
+
+				assertThat(submittedJobGraph, Matchers.is(Matchers.notNullValue()));
+
+				// check that the other submitted job graph store can remove the job graph after the original leader
+				// has lost its leadership
+				otherSubmittedJobGraphStore.removeJobGraph(jobId);
+
+				jobIds = otherSubmittedJobGraphStore.getJobIds();
+
+				assertThat(jobIds, Matchers.not(Matchers.contains(jobId)));
+			} finally {
+				RpcUtils.terminateRpcEndpoint(dispatcher, TIMEOUT);
+				client.close();
+				otherClient.close();
+			}
+		}
+	}
+
+	@Nonnull
+	private TestingDispatcher createDispatcher(TestingHighAvailabilityServices testingHighAvailabilityServices) throws Exception {
+		return new TestingDispatcher(
+			rpcService,
+			Dispatcher.DISPATCHER_NAME + '_' + name.getMethodName(),
+			configuration,
+			testingHighAvailabilityServices,
+			new TestingResourceManagerGateway(),
+			blobServer,
+			new HeartbeatServices(1000L, 1000L),
+			UnregisteredMetricGroups.createUnregisteredJobManagerMetricGroup(),
+			null,
+			new MemoryArchivedExecutionGraphStore(),
+			new TestingJobManagerRunnerFactory(new CompletableFuture<>(), new CompletableFuture<>()),
+			testingFatalErrorHandler);
+	}
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
index 79e6d20348a..8becb05ae91 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
@@ -32,8 +32,8 @@
 import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
-import org.apache.flink.runtime.checkpoint.CheckpointType;
 import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy;
+import org.apache.flink.runtime.checkpoint.CheckpointType;
 import org.apache.flink.runtime.clusterframework.messages.NotifyResourceStarted;
 import org.apache.flink.runtime.clusterframework.messages.RegisterResourceManager;
 import org.apache.flink.runtime.clusterframework.messages.RegisterResourceManagerSuccessful;
@@ -151,7 +151,6 @@
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
-
 import static org.mockito.Mockito.mock;
 
 public class JobManagerTest extends TestLogger {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperHAJobManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperHAJobManagerTest.java
new file mode 100644
index 00000000000..8e5b1b9b392
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperHAJobManagerTest.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.runtime.akka.ActorUtils;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.akka.ListeningBehaviour;
+import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
+import org.apache.flink.runtime.dispatcher.DispatcherHATest;
+import org.apache.flink.runtime.dispatcher.NoOpSubmittedJobGraphListener;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.instance.AkkaActorGateway;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
+import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.metrics.NoOpMetricRegistry;
+import org.apache.flink.runtime.testingUtils.TestingJobManager;
+import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.util.ZooKeeperUtils;
+import org.apache.flink.runtime.zookeeper.ZooKeeperResource;
+import org.apache.flink.util.TestLogger;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.ExtendedActorSystem;
+import akka.actor.Identify;
+import akka.actor.Terminated;
+import akka.pattern.Patterns;
+import org.apache.curator.framework.CuratorFramework;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.util.Collection;
+import java.util.concurrent.TimeUnit;
+
+import scala.Option;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
+
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.not;
+import static org.hamcrest.Matchers.notNullValue;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for the ZooKeeper HA service and {@link JobManager} interaction.
+ */
+public class ZooKeeperHAJobManagerTest extends TestLogger {
+
+	@ClassRule
+	public static final ZooKeeperResource ZOO_KEEPER_RESOURCE = new ZooKeeperResource();
+
+	@ClassRule
+	public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();
+
+	private static final FiniteDuration TIMEOUT = FiniteDuration.apply(10L, TimeUnit.SECONDS);
+
+	private static ActorSystem system;
+
+	@BeforeClass
+	public static void setup() {
+		system = AkkaUtils.createLocalActorSystem(new Configuration());
+	}
+
+	@AfterClass
+	public static void teardown() throws Exception {
+		final Future<Terminated> terminationFuture = system.terminate();
+		Await.ready(terminationFuture, TIMEOUT);
+	}
+
+	/**
+	 * Tests that the {@link JobManager} releases all locked {@link JobGraph} if it loses
+	 * leadership.
+	 */
+	@Test
+	public void testJobGraphReleaseWhenLosingLeadership() throws Exception {
+		final Configuration configuration = new Configuration();
+		configuration.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, ZOO_KEEPER_RESOURCE.getConnectString());
+		configuration.setString(HighAvailabilityOptions.HA_STORAGE_PATH, TEMPORARY_FOLDER.newFolder().getAbsolutePath());
+
+		try (TestingHighAvailabilityServices highAvailabilityServices = new TestingHighAvailabilityServices()) {
+
+			final CuratorFramework client = ZooKeeperUtils.startCuratorFramework(configuration);
+			final TestingLeaderElectionService leaderElectionService = new TestingLeaderElectionService();
+			highAvailabilityServices.setJobMasterLeaderElectionService(HighAvailabilityServices.DEFAULT_JOB_ID, leaderElectionService);
+			highAvailabilityServices.setSubmittedJobGraphStore(ZooKeeperUtils.createSubmittedJobGraphs(client, configuration));
+			highAvailabilityServices.setCheckpointRecoveryFactory(new StandaloneCheckpointRecoveryFactory());
+
+			final CuratorFramework otherClient = ZooKeeperUtils.startCuratorFramework(configuration);
+			final ZooKeeperSubmittedJobGraphStore otherSubmittedJobGraphStore = ZooKeeperUtils.createSubmittedJobGraphs(otherClient, configuration);
+			otherSubmittedJobGraphStore.start(NoOpSubmittedJobGraphListener.INSTANCE);
+
+			ActorRef jobManagerActorRef = null;
+			try {
+				jobManagerActorRef = JobManager.startJobManagerActors(
+					configuration,
+					system,
+					TestingUtils.defaultExecutor(),
+					TestingUtils.defaultExecutor(),
+					highAvailabilityServices,
+					NoOpMetricRegistry.INSTANCE,
+					Option.empty(),
+					TestingJobManager.class,
+					MemoryArchivist.class)._1();
+
+				waitForActorToBeStarted(jobManagerActorRef, TIMEOUT);
+
+				final ActorGateway jobManager = new AkkaActorGateway(jobManagerActorRef, HighAvailabilityServices.DEFAULT_LEADER_ID);
+
+				leaderElectionService.isLeader(HighAvailabilityServices.DEFAULT_LEADER_ID).get();
+
+				final JobGraph nonEmptyJobGraph = DispatcherHATest.createNonEmptyJobGraph();
+
+				final JobManagerMessages.SubmitJob submitJobMessage = new JobManagerMessages.SubmitJob(nonEmptyJobGraph, ListeningBehaviour.DETACHED);
+
+				Await.result(jobManager.ask(submitJobMessage, TIMEOUT), TIMEOUT);
+
+				Collection<JobID> jobIds = otherSubmittedJobGraphStore.getJobIds();
+
+				final JobID jobId = nonEmptyJobGraph.getJobID();
+				assertThat(jobIds, contains(jobId));
+
+				// revoke the leadership
+				leaderElectionService.notLeader();
+
+				Await.result(jobManager.ask(TestingJobManagerMessages.getWaitForBackgroundTasksToFinish(), TIMEOUT), TIMEOUT);
+
+				final SubmittedJobGraph recoveredJobGraph = akka.serialization.JavaSerializer.currentSystem().withValue(
+					((ExtendedActorSystem) system),
+					() -> otherSubmittedJobGraphStore.recoverJobGraph(jobId));
+
+				assertThat(recoveredJobGraph, is(notNullValue()));
+
+				otherSubmittedJobGraphStore.removeJobGraph(jobId);
+
+				jobIds = otherSubmittedJobGraphStore.getJobIds();
+
+				assertThat(jobIds, not(contains(jobId)));
+			} finally {
+				client.close();
+				otherClient.close();
+
+				if (jobManagerActorRef != null) {
+					ActorUtils.stopActor(jobManagerActorRef);
+				}
+			}
+		}
+	}
+
+	private void waitForActorToBeStarted(ActorRef jobManagerActorRef, FiniteDuration timeout) throws InterruptedException, java.util.concurrent.TimeoutException {
+		Await.ready(Patterns.ask(jobManagerActorRef, new Identify(42), timeout.toMillis()), timeout);
+	}
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphsStoreITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphsStoreITCase.java
index c1a7b536721..e9be145c37f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphsStoreITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphsStoreITCase.java
@@ -20,7 +20,6 @@
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.akka.ListeningBehaviour;
-import org.apache.flink.runtime.concurrent.Executors;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore.SubmittedJobGraphListener;
@@ -90,8 +89,7 @@ public void testPutAndRemoveJobGraph() throws Exception {
 		ZooKeeperSubmittedJobGraphStore jobGraphs = new ZooKeeperSubmittedJobGraphStore(
 			ZooKeeper.createClient(),
 			"/testPutAndRemoveJobGraph",
-			localStateStorage,
-			Executors.directExecutor());
+			localStateStorage);
 
 		try {
 			SubmittedJobGraphListener listener = mock(SubmittedJobGraphListener.class);
@@ -147,7 +145,7 @@ public void testPutAndRemoveJobGraph() throws Exception {
 	@Test
 	public void testRecoverJobGraphs() throws Exception {
 		ZooKeeperSubmittedJobGraphStore jobGraphs = new ZooKeeperSubmittedJobGraphStore(
-				ZooKeeper.createClient(), "/testRecoverJobGraphs", localStateStorage, Executors.directExecutor());
+				ZooKeeper.createClient(), "/testRecoverJobGraphs", localStateStorage);
 
 		try {
 			SubmittedJobGraphListener listener = mock(SubmittedJobGraphListener.class);
@@ -198,10 +196,10 @@ public void testConcurrentAddJobGraph() throws Exception {
 
 		try {
 			jobGraphs = new ZooKeeperSubmittedJobGraphStore(
-					ZooKeeper.createClient(), "/testConcurrentAddJobGraph", localStateStorage, Executors.directExecutor());
+					ZooKeeper.createClient(), "/testConcurrentAddJobGraph", localStateStorage);
 
 			otherJobGraphs = new ZooKeeperSubmittedJobGraphStore(
-					ZooKeeper.createClient(), "/testConcurrentAddJobGraph", localStateStorage, Executors.directExecutor());
+					ZooKeeper.createClient(), "/testConcurrentAddJobGraph", localStateStorage);
 
 
 			SubmittedJobGraph jobGraph = createSubmittedJobGraph(new JobID(), 0);
@@ -257,10 +255,10 @@ public Void answer(InvocationOnMock invocation) throws Throwable {
 	@Test(expected = IllegalStateException.class)
 	public void testUpdateJobGraphYouDidNotGetOrAdd() throws Exception {
 		ZooKeeperSubmittedJobGraphStore jobGraphs = new ZooKeeperSubmittedJobGraphStore(
-				ZooKeeper.createClient(), "/testUpdateJobGraphYouDidNotGetOrAdd", localStateStorage, Executors.directExecutor());
+				ZooKeeper.createClient(), "/testUpdateJobGraphYouDidNotGetOrAdd", localStateStorage);
 
 		ZooKeeperSubmittedJobGraphStore otherJobGraphs = new ZooKeeperSubmittedJobGraphStore(
-				ZooKeeper.createClient(), "/testUpdateJobGraphYouDidNotGetOrAdd", localStateStorage, Executors.directExecutor());
+				ZooKeeper.createClient(), "/testUpdateJobGraphYouDidNotGetOrAdd", localStateStorage);
 
 		jobGraphs.start(null);
 		otherJobGraphs.start(null);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/InMemorySubmittedJobGraphStore.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/InMemorySubmittedJobGraphStore.java
index ba0dc80fbb5..3b9c5786ca4 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/InMemorySubmittedJobGraphStore.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/InMemorySubmittedJobGraphStore.java
@@ -96,6 +96,11 @@ public synchronized void removeJobGraph(JobID jobId) throws Exception {
 		storedJobs.remove(jobId);
 	}
 
+	@Override
+	public void releaseJobGraph(JobID jobId) {
+		verifyIsStarted();
+	}
+
 	@Override
 	public synchronized Collection<JobID> getJobIds() throws Exception {
 		verifyIsStarted();
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperResource.java b/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperResource.java
new file mode 100644
index 00000000000..c4c56949cd9
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperResource.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.zookeeper;
+
+import org.apache.flink.util.Preconditions;
+
+import org.apache.curator.test.TestingServer;
+import org.junit.rules.ExternalResource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+
+/**
+ * {@link ExternalResource} which starts a {@link org.apache.zookeeper.server.ZooKeeperServer}.
+ */
+public class ZooKeeperResource extends ExternalResource {
+
+	private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperResource.class);
+
+	@Nullable
+	private TestingServer zooKeeperServer;
+
+	public String getConnectString() {
+		verifyIsRunning();
+		return zooKeeperServer.getConnectString();
+	}
+
+	private void verifyIsRunning() {
+		Preconditions.checkState(zooKeeperServer != null);
+	}
+
+	@Override
+	protected void before() throws Throwable {
+		terminateZooKeeperServer();
+		zooKeeperServer = new TestingServer(true);
+	}
+
+	private void terminateZooKeeperServer() throws IOException {
+		if (zooKeeperServer != null) {
+			zooKeeperServer.stop();
+			zooKeeperServer = null;
+		}
+	}
+
+	@Override
+	protected void after() {
+		try {
+			terminateZooKeeperServer();
+		} catch (IOException e) {
+			LOG.warn("Could not properly terminate the {}.", getClass().getSimpleName(), e);
+		}
+	}
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreTest.java
index fd39b25991c..2dd27e7c897 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreTest.java
@@ -18,21 +18,19 @@
 
 package org.apache.flink.runtime.zookeeper;
 
-import org.apache.curator.framework.CuratorFramework;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.HighAvailabilityOptions;
-import org.apache.flink.runtime.concurrent.Executors;
 import org.apache.flink.runtime.state.RetrievableStateHandle;
 import org.apache.flink.runtime.util.ZooKeeperUtils;
 import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.TestLogger;
+
+import org.apache.curator.framework.CuratorFramework;
 import org.apache.zookeeper.data.Stat;
 import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.Test;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -41,7 +39,6 @@
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
-import java.util.concurrent.CountDownLatch;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotEquals;
@@ -49,12 +46,7 @@
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 /**
@@ -88,8 +80,8 @@ public void cleanUp() throws Exception {
 	@Test
 	public void testAddAndLock() throws Exception {
 		LongStateStorage longStateStorage = new LongStateStorage();
-		ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<Long>(
-				ZOOKEEPER.getClient(), longStateStorage, Executors.directExecutor());
+		ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
+			ZOOKEEPER.getClient(), longStateStorage);
 
 		// Config
 		final String pathInZooKeeper = "/testAdd";
@@ -136,7 +128,7 @@ public void testAddAlreadyExistingPath() throws Exception {
 		LongStateStorage stateHandleProvider = new LongStateStorage();
 
 		ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
-				ZOOKEEPER.getClient(), stateHandleProvider, Executors.directExecutor());
+				ZOOKEEPER.getClient(), stateHandleProvider);
 
 		ZOOKEEPER.getClient().create().forPath("/testAddAlreadyExistingPath");
 
@@ -161,7 +153,7 @@ public void testAddDiscardStateHandleAfterFailure() throws Exception {
 		when(client.inTransaction().create()).thenThrow(new RuntimeException("Expected test Exception."));
 
 		ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
-				client, stateHandleProvider, Executors.directExecutor());
+				client, stateHandleProvider);
 
 		// Config
 		final String pathInZooKeeper = "/testAddDiscardStateHandleAfterFailure";
@@ -191,7 +183,7 @@ public void testReplace() throws Exception {
 		LongStateStorage stateHandleProvider = new LongStateStorage();
 
 		ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
-				ZOOKEEPER.getClient(), stateHandleProvider, Executors.directExecutor());
+				ZOOKEEPER.getClient(), stateHandleProvider);
 
 		// Config
 		final String pathInZooKeeper = "/testReplace";
@@ -230,7 +222,7 @@ public void testReplaceNonExistingPath() throws Exception {
 		RetrievableStateStorageHelper<Long> stateStorage = new LongStateStorage();
 
 		ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
-				ZOOKEEPER.getClient(), stateStorage, Executors.directExecutor());
+				ZOOKEEPER.getClient(), stateStorage);
 
 		store.replace("/testReplaceNonExistingPath", 0, 1L);
 	}
@@ -247,7 +239,7 @@ public void testReplaceDiscardStateHandleAfterFailure() throws Exception {
 		when(client.setData()).thenThrow(new RuntimeException("Expected test Exception."));
 
 		ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
-				client, stateHandleProvider, Executors.directExecutor());
+				client, stateHandleProvider);
 
 		// Config
 		final String pathInZooKeeper = "/testReplaceDiscardStateHandleAfterFailure";
@@ -289,7 +281,7 @@ public void testGetAndExists() throws Exception {
 		LongStateStorage stateHandleProvider = new LongStateStorage();
 
 		ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
-				ZOOKEEPER.getClient(), stateHandleProvider, Executors.directExecutor());
+				ZOOKEEPER.getClient(), stateHandleProvider);
 
 		// Config
 		final String pathInZooKeeper = "/testGetAndExists";
@@ -314,7 +306,7 @@ public void testGetNonExistingPath() throws Exception {
 		LongStateStorage stateHandleProvider = new LongStateStorage();
 
 		ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
-				ZOOKEEPER.getClient(), stateHandleProvider, Executors.directExecutor());
+				ZOOKEEPER.getClient(), stateHandleProvider);
 
 		store.getAndLock("/testGetNonExistingPath");
 	}
@@ -328,7 +320,7 @@ public void testGetAll() throws Exception {
 		LongStateStorage stateHandleProvider = new LongStateStorage();
 
 		ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
-				ZOOKEEPER.getClient(), stateHandleProvider, Executors.directExecutor());
+				ZOOKEEPER.getClient(), stateHandleProvider);
 
 		// Config
 		final String pathInZooKeeper = "/testGetAll";
@@ -359,7 +351,7 @@ public void testGetAllSortedByName() throws Exception {
 		LongStateStorage stateHandleProvider = new LongStateStorage();
 
 		ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
-				ZOOKEEPER.getClient(), stateHandleProvider, Executors.directExecutor());
+				ZOOKEEPER.getClient(), stateHandleProvider);
 
 		// Config
 		final String basePath = "/testGetAllSortedByName";
@@ -393,7 +385,7 @@ public void testRemove() throws Exception {
 		LongStateStorage stateHandleProvider = new LongStateStorage();
 
 		ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
-				ZOOKEEPER.getClient(), stateHandleProvider, Executors.directExecutor());
+				ZOOKEEPER.getClient(), stateHandleProvider);
 
 		// Config
 		final String pathInZooKeeper = "/testRemove";
@@ -401,50 +393,14 @@ public void testRemove() throws Exception {
 
 		store.addAndLock(pathInZooKeeper, state);
 
+		final int numberOfGlobalDiscardCalls = LongRetrievableStateHandle.getNumberOfGlobalDiscardCalls();
+
 		// Test
 		store.releaseAndTryRemove(pathInZooKeeper);
 
 		// Verify discarded
 		assertEquals(0, ZOOKEEPER.getClient().getChildren().forPath("/").size());
-	}
-
-	/**
-	 * Tests that state handles are correctly removed with a callback.
-	 */
-	@Test
-	public void testRemoveWithCallback() throws Exception {
-		// Setup
-		LongStateStorage stateHandleProvider = new LongStateStorage();
-
-		ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
-				ZOOKEEPER.getClient(), stateHandleProvider, Executors.directExecutor());
-
-		// Config
-		final String pathInZooKeeper = "/testRemoveWithCallback";
-		final Long state = 27255442L;
-
-		store.addAndLock(pathInZooKeeper, state);
-
-		final CountDownLatch sync = new CountDownLatch(1);
-		ZooKeeperStateHandleStore.RemoveCallback<Long> callback = mock(ZooKeeperStateHandleStore.RemoveCallback.class);
-		doAnswer(new Answer<Void>() {
-			@Override
-			public Void answer(InvocationOnMock invocation) throws Throwable {
-				sync.countDown();
-				return null;
-			}
-		}).when(callback).apply(any(RetrievableStateHandle.class));
-
-		// Test
-		store.releaseAndTryRemove(pathInZooKeeper, callback);
-
-		// Verify discarded and callback called
-		assertEquals(0, ZOOKEEPER.getClient().getChildren().forPath("/").size());
-
-		sync.await();
-
-		verify(callback, times(1))
-				.apply(any(RetrievableStateHandle.class));
+		assertEquals(numberOfGlobalDiscardCalls + 1, LongRetrievableStateHandle.getNumberOfGlobalDiscardCalls());
 	}
 
 	/** Tests that all state handles are correctly discarded. */
@@ -454,7 +410,7 @@ public void testReleaseAndTryRemoveAll() throws Exception {
 		LongStateStorage stateHandleProvider = new LongStateStorage();
 
 		ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
-				ZOOKEEPER.getClient(), stateHandleProvider, Executors.directExecutor());
+				ZOOKEEPER.getClient(), stateHandleProvider);
 
 		// Config
 		final String pathInZooKeeper = "/testDiscardAll";
@@ -486,8 +442,7 @@ public void testCorruptedData() throws Exception {
 
 		ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
 			ZOOKEEPER.getClient(),
-			stateStorage,
-			Executors.directExecutor());
+			stateStorage);
 
 		final Collection<Long> input = new HashSet<>();
 		input.add(1L);
@@ -543,13 +498,11 @@ public void testConcurrentDeleteOperation() throws Exception {
 
 		ZooKeeperStateHandleStore<Long> zkStore1 = new ZooKeeperStateHandleStore<>(
 			ZOOKEEPER.getClient(),
-			longStateStorage,
-			Executors.directExecutor());
+			longStateStorage);
 
 		ZooKeeperStateHandleStore<Long> zkStore2 = new ZooKeeperStateHandleStore<>(
 			ZOOKEEPER.getClient(),
-			longStateStorage,
-			Executors.directExecutor());
+			longStateStorage);
 
 		final String statePath = "/state";
 
@@ -586,13 +539,11 @@ public void testLockCleanupWhenGetAndLockFails() throws Exception {
 
 		ZooKeeperStateHandleStore<Long> zkStore1 = new ZooKeeperStateHandleStore<>(
 			ZOOKEEPER.getClient(),
-			longStateStorage,
-			Executors.directExecutor());
+			longStateStorage);
 
 		ZooKeeperStateHandleStore<Long> zkStore2 = new ZooKeeperStateHandleStore<>(
 			ZOOKEEPER.getClient(),
-			longStateStorage,
-			Executors.directExecutor());
+			longStateStorage);
 
 		final String path = "/state";
 
@@ -649,8 +600,7 @@ public void testLockCleanupWhenClientTimesOut() throws Exception {
 
 			ZooKeeperStateHandleStore<Long> zkStore = new ZooKeeperStateHandleStore<>(
 				client,
-				longStateStorage,
-				Executors.directExecutor());
+				longStateStorage);
 
 			final String path = "/state";
 
@@ -682,8 +632,7 @@ public void testRelease() throws Exception {
 
 		ZooKeeperStateHandleStore<Long> zkStore = new ZooKeeperStateHandleStore<>(
 			ZOOKEEPER.getClient(),
-			longStateStorage,
-			Executors.directExecutor());
+			longStateStorage);
 
 		final String path = "/state";
 
@@ -720,8 +669,7 @@ public void testReleaseAll() throws Exception {
 
 		ZooKeeperStateHandleStore<Long> zkStore = new ZooKeeperStateHandleStore<>(
 			ZOOKEEPER.getClient(),
-			longStateStorage,
-			Executors.directExecutor());
+			longStateStorage);
 
 		final Collection<String> paths = Arrays.asList("/state1", "/state2", "/state3");
 
@@ -775,9 +723,11 @@ public void testReleaseAll() throws Exception {
 
 		private static final long serialVersionUID = -3555329254423838912L;
 
+		private static int numberOfGlobalDiscardCalls = 0;
+
 		private final Long state;
 
-		private int numberOfDiscardCalls;
+		private int numberOfDiscardCalls = 0;
 
 		public LongRetrievableStateHandle(Long state) {
 			this.state = state;
@@ -790,6 +740,7 @@ public Long retrieveState() {
 
 		@Override
 		public void discardState() throws Exception {
+			numberOfGlobalDiscardCalls++;
 			numberOfDiscardCalls++;
 		}
 
@@ -798,8 +749,12 @@ public long getStateSize() {
 			return 0;
 		}
 
-		public int getNumberOfDiscardCalls() {
+		int getNumberOfDiscardCalls() {
 			return numberOfDiscardCalls;
 		}
+
+		public static int getNumberOfGlobalDiscardCalls() {
+			return numberOfGlobalDiscardCalls;
+		}
 	}
 }
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala
index 0640f39f4cf..ebe46399395 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala
@@ -454,6 +454,14 @@ trait TestingJobManagerLike extends FlinkActor {
         val receiver = waitForNumRegisteredTaskManagers.dequeue()._2
         receiver ! Acknowledge.get()
       }
+
+    case WaitForBackgroundTasksToFinish =>
+      val future = futuresToComplete match {
+        case Some(futures) => Future.sequence(futures)
+        case None => Future.successful(Seq())
+      }
+
+      future.pipeTo(sender())
   }
 
   def checkIfAllVerticesRunning(jobID: JobID): Boolean = {
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala
index c8529a9e07a..64af056f24d 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala
@@ -59,6 +59,8 @@ object TestingJobManagerMessages {
 
   case object NotifyListeners
 
+  case object WaitForBackgroundTasksToFinish
+
   case class NotifyWhenTaskManagerTerminated(taskManager: ActorRef)
   case class TaskManagerTerminated(taskManager: ActorRef)
 
@@ -164,4 +166,5 @@ object TestingJobManagerMessages {
   def getClientConnected(): AnyRef = ClientConnected
   def getClassLoadingPropsDelivered(): AnyRef = ClassLoadingPropsDelivered
 
+  def getWaitForBackgroundTasksToFinish(): AnyRef = WaitForBackgroundTasksToFinish
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services