You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2018/09/12 14:52:43 UTC

[GitHub] tillrohrmann closed pull request #6590: [Backport 1.4][FLINK-10011] Release JobGraph from SubmittedJobGraphStore

tillrohrmann closed pull request #6590: [Backport 1.4][FLINK-10011] Release JobGraph from SubmittedJobGraphStore
URL: https://github.com/apache/flink/pull/6590
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/ZooKeeperMesosServices.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/ZooKeeperMesosServices.java
index 069cb833a3a..45d11412c50 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/ZooKeeperMesosServices.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/ZooKeeperMesosServices.java
@@ -56,8 +56,7 @@ public MesosWorkerStore createMesosWorkerStore(Configuration configuration, Exec
 
 		ZooKeeperStateHandleStore<MesosWorkerStore.Worker> zooKeeperStateHandleStore = zooKeeperUtilityFactory.createZooKeeperStateHandleStore(
 			"/workers",
-			stateStorageHelper,
-			executor);
+			stateStorageHelper);
 
 		ZooKeeperSharedValue frameworkId = zooKeeperUtilityFactory.createSharedValue("/frameworkId", new byte[0]);
 		ZooKeeperSharedCount totalTaskCount = zooKeeperUtilityFactory.createSharedCount("/taskCount", 0);
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
index f22127041d3..533026041f8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
@@ -31,8 +31,6 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.annotation.Nullable;
-
 import java.io.IOException;
 import java.util.ArrayDeque;
 import java.util.ArrayList;
@@ -86,6 +84,8 @@
 	 */
 	private final ArrayDeque<CompletedCheckpoint> completedCheckpoints;
 
+	private final Executor executor;
+
 	/**
 	 * Creates a {@link ZooKeeperCompletedCheckpointStore} instance.
 	 *
@@ -98,7 +98,7 @@
 	 *                                       start with a '/')
 	 * @param stateStorage                   State storage to be used to persist the completed
 	 *                                       checkpoint
-	 * @param executor to give to the ZooKeeperStateHandleStore to run ZooKeeper callbacks
+	 * @param executor to execute blocking calls
 	 * @throws Exception
 	 */
 	public ZooKeeperCompletedCheckpointStore(
@@ -123,10 +123,12 @@ public ZooKeeperCompletedCheckpointStore(
 		// All operations will have the path as root
 		this.client = client.usingNamespace(client.getNamespace() + checkpointsPath);
 
-		this.checkpointsInZooKeeper = new ZooKeeperStateHandleStore<>(this.client, stateStorage, executor);
+		this.checkpointsInZooKeeper = new ZooKeeperStateHandleStore<>(this.client, stateStorage);
 
 		this.completedCheckpoints = new ArrayDeque<>(maxNumberOfCheckpointsToRetain + 1);
 
+		this.executor = checkNotNull(executor);
+
 		LOG.info("Initialized in '{}'.", checkpointsPath);
 	}
 
@@ -237,7 +239,18 @@ public void addCheckpoint(final CompletedCheckpoint checkpoint) throws Exception
 		// Everything worked, let's remove a previous checkpoint if necessary.
 		while (completedCheckpoints.size() > maxNumberOfCheckpointsToRetain) {
 			try {
-				removeSubsumed(completedCheckpoints.removeFirst());
+				final CompletedCheckpoint completedCheckpoint = completedCheckpoints.removeFirst();
+
+				if (tryRemove(completedCheckpoint.getCheckpointID())) {
+					executor.execute(() -> {
+						try {
+							completedCheckpoint.discardOnSubsume();
+						} catch (Exception e) {
+							LOG.warn("Could not discard subsumed completed checkpoint {}.", completedCheckpoint.getCheckpointID(), e);
+						}
+					});
+
+				}
 			} catch (Exception e) {
 				LOG.warn("Failed to subsume the old checkpoint", e);
 			}
@@ -279,7 +292,17 @@ public void shutdown(JobStatus jobStatus) throws Exception {
 
 			for (CompletedCheckpoint checkpoint : completedCheckpoints) {
 				try {
-					removeShutdown(checkpoint, jobStatus);
+					if (tryRemove(checkpoint.getCheckpointID())) {
+						executor.execute(
+							() -> {
+								try {
+									checkpoint.discardOnShutdown(jobStatus);
+								} catch (Exception e) {
+									LOG.warn("Could not discard completed checkpoint {} on shutdown.", checkpoint.getCheckpointID(), e);
+								}
+							}
+						);
+					}
 				} catch (Exception e) {
 					LOG.error("Failed to discard checkpoint.", e);
 				}
@@ -305,59 +328,13 @@ public void shutdown(JobStatus jobStatus) throws Exception {
 	// ------------------------------------------------------------------------
 
 	/**
-	 * Removes a subsumed checkpoint from ZooKeeper and drops the state.
-	 */
-	private void removeSubsumed(
-		final CompletedCheckpoint completedCheckpoint) throws Exception {
-
-		if (completedCheckpoint == null) {
-			return;
-		}
-
-		ZooKeeperStateHandleStore.RemoveCallback<CompletedCheckpoint> action =
-			new ZooKeeperStateHandleStore.RemoveCallback<CompletedCheckpoint>() {
-				@Override
-				public void apply(@Nullable RetrievableStateHandle<CompletedCheckpoint> value) throws FlinkException {
-					if (value != null) {
-						try {
-							completedCheckpoint.discardOnSubsume();
-						} catch (Exception e) {
-							throw new FlinkException("Could not discard the completed checkpoint on subsume.", e);
-						}
-					}
-				}
-			};
-
-		checkpointsInZooKeeper.releaseAndTryRemove(
-			checkpointIdToPath(completedCheckpoint.getCheckpointID()),
-			action);
-	}
-
-	/**
-	 * Removes a checkpoint from ZooKeeper because of Job shutdown and drops the state.
+	 * Tries to remove the checkpoint identified by the given checkpoint id.
+	 *
+	 * @param checkpointId identifying the checkpoint to remove
+	 * @return true if the checkpoint could be removed
 	 */
-	private void removeShutdown(
-			final CompletedCheckpoint completedCheckpoint,
-			final JobStatus jobStatus) throws Exception {
-
-		if (completedCheckpoint == null) {
-			return;
-		}
-
-		ZooKeeperStateHandleStore.RemoveCallback<CompletedCheckpoint> removeAction = new ZooKeeperStateHandleStore.RemoveCallback<CompletedCheckpoint>() {
-			@Override
-			public void apply(@Nullable RetrievableStateHandle<CompletedCheckpoint> value) throws FlinkException {
-				try {
-					completedCheckpoint.discardOnShutdown(jobStatus);
-				} catch (Exception e) {
-					throw new FlinkException("Could not discard the completed checkpoint on subsume.", e);
-				}
-			}
-		};
-
-		checkpointsInZooKeeper.releaseAndTryRemove(
-			checkpointIdToPath(completedCheckpoint.getCheckpointID()),
-			removeAction);
+	private boolean tryRemove(long checkpointId) throws Exception {
+		return checkpointsInZooKeeper.releaseAndTryRemove(checkpointIdToPath(checkpointId));
 	}
 
 	/**
@@ -381,7 +358,7 @@ public static long pathToCheckpointId(String path) {
 			String numberString;
 
 			// check if we have a leading slash
-			if ('/' == path.charAt(0) ) {
+			if ('/' == path.charAt(0)) {
 				numberString = path.substring(1);
 			} else {
 				numberString = path;
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServices.java
index 04ab6d3cc96..b5822c5590a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServices.java
@@ -163,7 +163,7 @@ public CheckpointRecoveryFactory getCheckpointRecoveryFactory() {
 
 	@Override
 	public SubmittedJobGraphStore getSubmittedJobGraphStore() throws Exception {
-		return ZooKeeperUtils.createSubmittedJobGraphs(client, configuration, executor);
+		return ZooKeeperUtils.createSubmittedJobGraphs(client, configuration);
 	}
 
 	@Override
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/StandaloneSubmittedJobGraphStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/StandaloneSubmittedJobGraphStore.java
index d1ca1a38853..f28621f0d69 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/StandaloneSubmittedJobGraphStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/StandaloneSubmittedJobGraphStore.java
@@ -43,22 +43,27 @@ public void stop() {
 	}
 
 	@Override
-	public void putJobGraph(SubmittedJobGraph jobGraph) throws Exception {
+	public void putJobGraph(SubmittedJobGraph jobGraph) {
 		// Nothing to do
 	}
 
 	@Override
-	public void removeJobGraph(JobID jobId) throws Exception {
+	public void removeJobGraph(JobID jobId) {
 		// Nothing to do
 	}
 
 	@Override
-	public Collection<JobID> getJobIds() throws Exception {
+	public void releaseJobGraph(JobID jobId) {
+		// nothing to do
+	}
+
+	@Override
+	public Collection<JobID> getJobIds() {
 		return Collections.emptyList();
 	}
 
 	@Override
-	public SubmittedJobGraph recoverJobGraph(JobID jobId) throws Exception {
+	public SubmittedJobGraph recoverJobGraph(JobID jobId) {
 		return null;
 	}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/SubmittedJobGraphStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/SubmittedJobGraphStore.java
index 6e91f80ebeb..b622823e7b7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/SubmittedJobGraphStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/SubmittedJobGraphStore.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.jobmanager;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.jobgraph.JobGraph;
 
 import java.util.Collection;
 
@@ -56,6 +57,17 @@
 	 */
 	void removeJobGraph(JobID jobId) throws Exception;
 
+	/**
+	 * Releases the locks on the specified {@link JobGraph}.
+	 *
+	 * Releasing the locks allows that another instance can delete the job from
+	 * the {@link SubmittedJobGraphStore}.
+	 *
+	 * @param jobId specifying the job to release the locks for
+	 * @throws Exception if the locks cannot be released
+	 */
+	void releaseJobGraph(JobID jobId) throws Exception;
+
 	/**
 	 * Get all job ids of submitted job graphs to the submitted job graph store.
 	 *
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
index f31c970a8dc..3ac4f25b709 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
@@ -38,7 +38,6 @@
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
-import java.util.concurrent.Executor;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.apache.flink.util.Preconditions.checkState;
@@ -66,13 +65,13 @@
 	/** Lock to synchronize with the {@link SubmittedJobGraphListener}. */
 	private final Object cacheLock = new Object();
 
-	/** Client (not a namespace facade) */
+	/** Client (not a namespace facade). */
 	private final CuratorFramework client;
 
 	/** The set of IDs of all added job graphs. */
 	private final Set<JobID> addedJobGraphs = new HashSet<>();
 
-	/** Completed checkpoints in ZooKeeper */
+	/** Completed checkpoints in ZooKeeper. */
 	private final ZooKeeperStateHandleStore<SubmittedJobGraph> jobGraphsInZooKeeper;
 
 	/**
@@ -91,19 +90,17 @@
 	private boolean isRunning;
 
 	/**
-	 * Submitted job graph store backed by ZooKeeper
+	 * Submitted job graph store backed by ZooKeeper.
 	 *
 	 * @param client ZooKeeper client
 	 * @param currentJobsPath ZooKeeper path for current job graphs
 	 * @param stateStorage State storage used to persist the submitted jobs
-	 * @param executor to give to the ZooKeeperStateHandleStore to run ZooKeeper callbacks
 	 * @throws Exception
 	 */
 	public ZooKeeperSubmittedJobGraphStore(
 			CuratorFramework client,
 			String currentJobsPath,
-			RetrievableStateStorageHelper<SubmittedJobGraph> stateStorage,
-			Executor executor) throws Exception {
+			RetrievableStateStorageHelper<SubmittedJobGraph> stateStorage) throws Exception {
 
 		checkNotNull(currentJobsPath, "Current jobs path");
 		checkNotNull(stateStorage, "State storage");
@@ -120,7 +117,7 @@ public ZooKeeperSubmittedJobGraphStore(
 		CuratorFramework facade = client.usingNamespace(client.getNamespace() + currentJobsPath);
 
 		this.zooKeeperFullBasePath = client.getNamespace() + currentJobsPath;
-		this.jobGraphsInZooKeeper = new ZooKeeperStateHandleStore<>(facade, stateStorage, executor);
+		this.jobGraphsInZooKeeper = new ZooKeeperStateHandleStore<>(facade, stateStorage);
 
 		this.pathCache = new PathChildrenCache(facade, "/", false);
 		pathCache.getListenable().addListener(new SubmittedJobGraphsPathCacheListener());
@@ -272,6 +269,24 @@ public void removeJobGraph(JobID jobId) throws Exception {
 		LOG.info("Removed job graph {} from ZooKeeper.", jobId);
 	}
 
+	@Override
+	public void releaseJobGraph(JobID jobId) throws Exception {
+		checkNotNull(jobId, "Job ID");
+		final String path = getPathForJob(jobId);
+
+		LOG.debug("Releasing locks of job graph {} from {}{}.", jobId, zooKeeperFullBasePath, path);
+
+		synchronized (cacheLock) {
+			if (addedJobGraphs.contains(jobId)) {
+				jobGraphsInZooKeeper.release(path);
+
+				addedJobGraphs.remove(jobId);
+			}
+		}
+
+		LOG.info("Released locks of job graph {} from ZooKeeper.", jobId);
+	}
+
 	@Override
 	public Collection<JobID> getJobIds() throws Exception {
 		Collection<String> paths;
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
index a7ac500a1d7..bd7b74ce1d4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
@@ -18,12 +18,6 @@
 
 package org.apache.flink.runtime.util;
 
-import org.apache.commons.lang3.StringUtils;
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.CuratorFrameworkFactory;
-import org.apache.curator.framework.api.ACLProvider;
-import org.apache.curator.framework.imps.DefaultACLProvider;
-import org.apache.curator.retry.ExponentialBackoffRetry;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.HighAvailabilityOptions;
@@ -41,6 +35,13 @@
 import org.apache.flink.runtime.zookeeper.RetrievableStateStorageHelper;
 import org.apache.flink.runtime.zookeeper.filesystem.FileSystemStateStorageHelper;
 import org.apache.flink.util.Preconditions;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.api.ACLProvider;
+import org.apache.curator.framework.imps.DefaultACLProvider;
+import org.apache.curator.retry.ExponentialBackoffRetry;
 import org.apache.zookeeper.ZooDefs;
 import org.apache.zookeeper.data.ACL;
 import org.slf4j.Logger;
@@ -53,6 +54,9 @@
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
+/**
+ * Class containing helper functions to interact with ZooKeeper.
+ */
 public class ZooKeeperUtils {
 
 	private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperUtils.class);
@@ -92,14 +96,14 @@ public static CuratorFramework startCuratorFramework(Configuration configuration
 
 		ZkClientACLMode aclMode = ZkClientACLMode.fromConfig(configuration);
 
-		if(disableSaslClient && aclMode == ZkClientACLMode.CREATOR) {
-			String errorMessage = "Cannot set ACL role to " + aclMode +"  since SASL authentication is " +
+		if (disableSaslClient && aclMode == ZkClientACLMode.CREATOR) {
+			String errorMessage = "Cannot set ACL role to " + aclMode + "  since SASL authentication is " +
 					"disabled through the " + SecurityOptions.ZOOKEEPER_SASL_DISABLE.key() + " property";
 			LOG.warn(errorMessage);
 			throw new IllegalConfigurationException(errorMessage);
 		}
 
-		if(aclMode == ZkClientACLMode.CREATOR) {
+		if (aclMode == ZkClientACLMode.CREATOR) {
 			LOG.info("Enforcing creator for ZK connections");
 			aclProvider = new SecureAclProvider();
 		} else {
@@ -107,7 +111,6 @@ public static CuratorFramework startCuratorFramework(Configuration configuration
 			aclProvider = new DefaultACLProvider();
 		}
 
-
 		String rootWithNamespace = generateZookeeperPath(root, namespace);
 
 		LOG.info("Using '{}' as Zookeeper namespace.", rootWithNamespace);
@@ -164,8 +167,7 @@ public static String getZooKeeperEnsemble(Configuration flinkConf)
 	 */
 	public static ZooKeeperLeaderRetrievalService createLeaderRetrievalService(
 		final CuratorFramework client,
-		final Configuration configuration) throws Exception
-	{
+		final Configuration configuration) throws Exception {
 		return createLeaderRetrievalService(client, configuration, "");
 	}
 
@@ -181,8 +183,7 @@ public static ZooKeeperLeaderRetrievalService createLeaderRetrievalService(
 	public static ZooKeeperLeaderRetrievalService createLeaderRetrievalService(
 		final CuratorFramework client,
 		final Configuration configuration,
-		final String pathSuffix)
-	{
+		final String pathSuffix) {
 		String leaderPath = configuration.getString(
 			HighAvailabilityOptions.HA_ZOOKEEPER_LEADER_PATH) + pathSuffix;
 
@@ -214,8 +215,7 @@ public static ZooKeeperLeaderElectionService createLeaderElectionService(
 	public static ZooKeeperLeaderElectionService createLeaderElectionService(
 		final CuratorFramework client,
 		final Configuration configuration,
-		final String pathSuffix)
-	{
+		final String pathSuffix) {
 		final String latchPath = configuration.getString(
 			HighAvailabilityOptions.HA_ZOOKEEPER_LATCH_PATH) + pathSuffix;
 		final String leaderPath = configuration.getString(
@@ -229,14 +229,12 @@ public static ZooKeeperLeaderElectionService createLeaderElectionService(
 	 *
 	 * @param client        The {@link CuratorFramework} ZooKeeper client to use
 	 * @param configuration {@link Configuration} object
-	 * @param executor to run ZooKeeper callbacks
 	 * @return {@link ZooKeeperSubmittedJobGraphStore} instance
 	 * @throws Exception if the submitted job graph store cannot be created
 	 */
 	public static ZooKeeperSubmittedJobGraphStore createSubmittedJobGraphs(
 			CuratorFramework client,
-			Configuration configuration,
-			Executor executor) throws Exception {
+			Configuration configuration) throws Exception {
 
 		checkNotNull(configuration, "Configuration");
 
@@ -246,7 +244,9 @@ public static ZooKeeperSubmittedJobGraphStore createSubmittedJobGraphs(
 		String zooKeeperSubmittedJobsPath = configuration.getString(HighAvailabilityOptions.HA_ZOOKEEPER_JOBGRAPHS_PATH);
 
 		return new ZooKeeperSubmittedJobGraphStore(
-				client, zooKeeperSubmittedJobsPath, stateStorage, executor);
+			client,
+			zooKeeperSubmittedJobsPath,
+			stateStorage);
 	}
 
 	/**
@@ -346,22 +346,24 @@ public static String generateZookeeperPath(String root, String namespace) {
 		return root + namespace;
 	}
 
-
-	public static class SecureAclProvider implements ACLProvider
-	{
+	/**
+	 * Secure {@link ACLProvider} implementation.
+	 */
+	public static class SecureAclProvider implements ACLProvider {
 		@Override
-		public List<ACL> getDefaultAcl()
-		{
+		public List<ACL> getDefaultAcl() {
 			return ZooDefs.Ids.CREATOR_ALL_ACL;
 		}
 
 		@Override
-		public List<ACL> getAclForPath(String path)
-		{
+		public List<ACL> getAclForPath(String path) {
 			return ZooDefs.Ids.CREATOR_ALL_ACL;
 		}
 	}
 
+	/**
+	 * ZooKeeper client ACL mode enum.
+	 */
 	public enum ZkClientACLMode {
 		CREATOR,
 		OPEN;
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
index f0d67fd2a87..f266e36471c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
@@ -18,17 +18,13 @@
 
 package org.apache.flink.runtime.zookeeper;
 
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.api.BackgroundCallback;
-import org.apache.curator.framework.api.CuratorEvent;
-import org.apache.curator.framework.api.CuratorEventType;
-import org.apache.curator.utils.ZKPaths;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.state.RetrievableStateHandle;
 import org.apache.flink.util.ExceptionUtils;
-import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.InstantiationUtil;
-import org.apache.flink.util.Preconditions;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.utils.ZKPaths;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.data.Stat;
@@ -36,6 +32,7 @@
 import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
+
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
@@ -44,7 +41,6 @@
 import java.util.ConcurrentModificationException;
 import java.util.List;
 import java.util.UUID;
-import java.util.concurrent.Executor;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -68,13 +64,13 @@
  * State handle in ZooKeeper =&gt; State handle exists
  * </pre>
  *
- * But not:
+ * <p>But not:
  *
  * <pre>
  * State handle exists =&gt; State handle in ZooKeeper
  * </pre>
  *
- * There can be lingering state handles when failures happen during operation. They
+ * <p>There can be lingering state handles when failures happen during operation. They
  * need to be cleaned up manually (see <a href="https://issues.apache.org/jira/browse/FLINK-2513">
  * FLINK-2513</a> about a possible way to overcome this).
  *
@@ -84,13 +80,11 @@
 
 	public static Logger LOG = LoggerFactory.getLogger(ZooKeeperStateHandleStore.class);
 
-	/** Curator ZooKeeper client */
+	/** Curator ZooKeeper client. */
 	private final CuratorFramework client;
 
 	private final RetrievableStateStorageHelper<T> storage;
 
-	private final Executor executor;
-
 	/** Lock node name of this ZooKeeperStateHandleStore. The name should be unique among all other state handle stores. */
 	private final String lockNode;
 
@@ -103,16 +97,13 @@
 	 *                            instance, e.g. <code>client.usingNamespace("/stateHandles")</code>
 	 * @param storage to persist the actual state and whose returned state handle is then written
 	 *                to ZooKeeper
-	 * @param executor to run the ZooKeeper callbacks
 	 */
 	public ZooKeeperStateHandleStore(
 		CuratorFramework client,
-		RetrievableStateStorageHelper<T> storage,
-		Executor executor) {
+		RetrievableStateStorageHelper<T> storage) {
 
 		this.client = checkNotNull(client, "Curator client");
 		this.storage = checkNotNull(storage, "State storage");
-		this.executor = checkNotNull(executor);
 
 		// Generate a unique lock node name
 		lockNode = UUID.randomUUID().toString();
@@ -262,7 +253,7 @@ public int exists(String pathInZooKeeper) throws Exception {
 	public Collection<String> getAllPaths() throws Exception {
 		final String path = "/";
 
-		while(true) {
+		while (true) {
 			Stat stat = client.checkExists().forPath(path);
 
 			if (stat == null) {
@@ -393,33 +384,14 @@ public int exists(String pathInZooKeeper) throws Exception {
 
 	/**
 	 * Releases the lock for the given state node and tries to remove the state node if it is no longer locked.
-	 * The deletion of the state node is executed asynchronously.
-	 *
-	 * <p><strong>Important</strong>: This also discards the stored state handle after the given action
-	 * has been executed.
-	 *
-	 * @param pathInZooKeeper Path of state handle to remove (expected to start with a '/')
-	 * @throws Exception If the ZooKeeper operation fails
-	 */
-	public void releaseAndTryRemove(String pathInZooKeeper) throws Exception {
-		releaseAndTryRemove(pathInZooKeeper, null);
-	}
-
-	/**
-	 * Releases the lock for the given state node and tries to remove the state node if it is no longer locked.
-	 * The deletion of the state node is executed asynchronously. After the state node has been deleted, the given
-	 * callback is called with the {@link RetrievableStateHandle} of the deleted state node.
-	 *
-	 * <p><strong>Important</strong>: This also discards the stored state handle after the given action
-	 * has been executed.
+	 * It returns the {@link RetrievableStateHandle} stored under the given state node if any.
 	 *
 	 * @param pathInZooKeeper Path of state handle to remove
-	 * @param callback The callback to execute after a successful deletion. Null if no action needs to be executed.
-	 * @throws Exception If the ZooKeeper operation fails
+	 * @return True if the state handle could be released
+	 * @throws Exception If the ZooKeeper operation or discarding the state handle fails
 	 */
-	public void releaseAndTryRemove(
-			String pathInZooKeeper,
-			@Nullable final RemoveCallback<T> callback) throws Exception {
+	@Nullable
+	public boolean releaseAndTryRemove(String pathInZooKeeper) throws Exception {
 		checkNotNull(pathInZooKeeper, "Path in ZooKeeper");
 
 		final String path = normalizePath(pathInZooKeeper);
@@ -429,14 +401,23 @@ public void releaseAndTryRemove(
 		try {
 			stateHandle = get(path, false);
 		} catch (Exception e) {
-			LOG.warn("Could not retrieve the state handle from node " + path + '.', e);
+			LOG.warn("Could not retrieve the state handle from node {}.", path, e);
 		}
 
 		release(pathInZooKeeper);
 
-		final BackgroundCallback backgroundCallback = new RemoveBackgroundCallback<>(stateHandle, callback, path);
+		try {
+			client.delete().forPath(path);
+		} catch (KeeperException.NotEmptyException ignored) {
+			LOG.debug("Could not delete znode {} because it is still locked.", path);
+			return false;
+		}
+
+		if (stateHandle != null) {
+			stateHandle.discardState();
+		}
 
-		client.delete().inBackground(backgroundCallback, executor).forPath(path);
+		return true;
 	}
 
 	/**
@@ -583,7 +564,7 @@ protected String getLockPath(String rootPath) {
 	}
 
 	/**
-	 * Makes sure that every path starts with a "/"
+	 * Makes sure that every path starts with a "/".
 	 *
 	 * @param path Path to normalize
 	 * @return Normalized path such that it starts with a "/"
@@ -595,103 +576,4 @@ private static String normalizePath(String path) {
 			return '/' + path;
 		}
 	}
-
-	// ---------------------------------------------------------------------------------------------------------
-	// Utility classes
-	// ---------------------------------------------------------------------------------------------------------
-
-	/**
-	 * Callback which is executed when removing a node from ZooKeeper. The callback will call the given
-	 * {@link RemoveCallback} if it is not null. Afterwards, it will discard the given {@link RetrievableStateHandle}
-	 * if it is not null.
-	 *
-	 * @param <T> Type of the value stored in the RetrievableStateHandle
-	 */
-	private static final class RemoveBackgroundCallback<T extends Serializable> implements BackgroundCallback {
-		@Nullable
-		private final RetrievableStateHandle<T> stateHandle;
-
-		@Nullable
-		private final RemoveCallback<T> callback;
-
-		private final String pathInZooKeeper;
-
-		private RemoveBackgroundCallback(
-			@Nullable RetrievableStateHandle<T> stateHandle,
-			@Nullable RemoveCallback<T> callback,
-			String pathInZooKeeper) {
-
-			this.stateHandle = stateHandle;
-			this.callback = callback;
-			this.pathInZooKeeper = Preconditions.checkNotNull(pathInZooKeeper);
-		}
-
-		@Override
-		public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
-			try {
-				if (event.getType() == CuratorEventType.DELETE) {
-					final KeeperException.Code resultCode = KeeperException.Code.get(event.getResultCode());
-
-					if (resultCode == KeeperException.Code.OK) {
-						Exception exception = null;
-
-						if (null != callback) {
-							try {
-								callback.apply(stateHandle);
-							} catch (Throwable e) {
-								exception = new Exception("Could not execute delete action for node " +
-									pathInZooKeeper + '.', e);
-							}
-						}
-
-						if (stateHandle != null) {
-							try {
-								// Discard the state handle
-								stateHandle.discardState();
-							} catch (Throwable e) {
-								Exception newException = new Exception("Could not discard state handle of node " +
-									pathInZooKeeper + '.', e);
-
-								if (exception == null) {
-									exception = newException;
-								} else {
-									exception.addSuppressed(newException);
-								}
-							}
-						}
-
-						if (exception != null) {
-							throw exception;
-						}
-					} else if (resultCode == KeeperException.Code.NOTEMPTY) {
-						// Could not delete the node because it still contains children/locks
-						LOG.debug("Could not delete node " + pathInZooKeeper + " because it is still locked.");
-					} else {
-						throw new IllegalStateException("Unexpected result code " +
-							resultCode.name() + " in '" + event + "' callback.");
-					}
-				} else {
-					throw new IllegalStateException("Unexpected event type " +
-						event.getType() + " in '" + event + "' callback.");
-				}
-			} catch (Exception e) {
-				LOG.warn("Failed to run callback for delete operation on node " + pathInZooKeeper + '.', e);
-			}
-
-		}
-	}
-
-	/**
-	 * Callback interface for remove calls
-	 */
-	public interface RemoveCallback<T extends Serializable> {
-		/**
-		 * Callback method. The parameter can be null if the {@link RetrievableStateHandle} could not be retrieved
-		 * from ZooKeeper.
-		 *
-		 * @param value RetrievableStateHandle retrieved from ZooKeeper, null if it was not retrievable
-		 * @throws FlinkException If the callback failed
-		 */
-		void apply(@Nullable RetrievableStateHandle<T> value) throws FlinkException;
-	}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperUtilityFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperUtilityFactory.java
index d3b7dc5b379..3e294e0dbdd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperUtilityFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperUtilityFactory.java
@@ -18,15 +18,15 @@
 
 package org.apache.flink.runtime.zookeeper;
 
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.recipes.shared.SharedCount;
-import org.apache.curator.framework.recipes.shared.SharedValue;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.util.ZooKeeperUtils;
 import org.apache.flink.util.Preconditions;
 
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.recipes.shared.SharedCount;
+import org.apache.curator.framework.recipes.shared.SharedValue;
+
 import java.io.Serializable;
-import java.util.concurrent.Executor;
 
 /**
  * Creates ZooKeeper utility classes without exposing the {@link CuratorFramework} dependency. The
@@ -71,7 +71,6 @@ public void close(boolean cleanup) throws Exception {
 	 *
 	 * @param zkStateHandleStorePath specifying the path in ZooKeeper to store the state handles to
 	 * @param stateStorageHelper storing the actual state data
-	 * @param executor to run asynchronous callbacks of the state handle store
 	 * @param <T> Type of the state to be stored
 	 * @return a ZooKeeperStateHandleStore instance
 	 * @throws Exception if ZooKeeper could not create the provided state handle store path in
@@ -79,8 +78,7 @@ public void close(boolean cleanup) throws Exception {
 	 */
 	public <T extends Serializable> ZooKeeperStateHandleStore<T> createZooKeeperStateHandleStore(
 			String zkStateHandleStorePath,
-			RetrievableStateStorageHelper<T> stateStorageHelper,
-			Executor executor) throws Exception {
+			RetrievableStateStorageHelper<T> stateStorageHelper) throws Exception {
 
 		facade.newNamespaceAwareEnsurePath(zkStateHandleStorePath).ensure(facade.getZookeeperClient());
 		CuratorFramework stateHandleStoreFacade = facade.usingNamespace(
@@ -88,7 +86,7 @@ public void close(boolean cleanup) throws Exception {
 				facade.getNamespace(),
 				zkStateHandleStorePath));
 
-		return new ZooKeeperStateHandleStore<>(stateHandleStoreFacade, stateStorageHelper, executor);
+		return new ZooKeeperStateHandleStore<>(stateHandleStoreFacade, stateStorageHelper);
 	}
 
 	/**
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index af42870ab69..27286969a0f 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -1727,27 +1727,27 @@ class JobManager(
     // Don't remove the job yet...
     val futureOption = currentJobs.get(jobID) match {
       case Some((eg, _)) =>
-        val result = if (removeJobFromStateBackend) {
-          val futureOption = Some(future {
-            try {
+        val result = Some(future {
+          try {
+            if (removeJobFromStateBackend) {
               // ...otherwise, we can have lingering resources when there is a  concurrent shutdown
               // and the ZooKeeper client is closed. Not removing the job immediately allow the
               // shutdown to release all resources.
               submittedJobGraphs.removeJobGraph(jobID)
-            } catch {
-              case t: Throwable => log.warn(s"Could not remove submitted job graph $jobID.", t)
+            } else {
+              submittedJobGraphs.releaseJobGraph(jobID)
             }
-          }(context.dispatcher))
+          } catch {
+            case t: Throwable => log.warn(s"Could not remove submitted job graph $jobID.", t)
+          }
+        }(context.dispatcher))
 
+        if (removeJobFromStateBackend) {
           try {
             archive ! decorateMessage(ArchiveExecutionGraph(jobID, eg.archive()))
           } catch {
             case t: Throwable => log.warn(s"Could not archive the execution graph $eg.", t)
           }
-
-          futureOption
-        } else {
-          None
         }
 
         currentJobs.remove(jobID)
@@ -1772,18 +1772,23 @@ class JobManager(
     */
   private def cancelAndClearEverything(cause: Throwable)
     : Seq[Future[Unit]] = {
-    val futures = for ((jobID, (eg, jobInfo)) <- currentJobs) yield {
-      future {
-        eg.suspend(cause)
+
+    val futures = currentJobs.values.flatMap(
+      egJobInfo => {
+        val executionGraph = egJobInfo._1
+        val jobInfo = egJobInfo._2
+
+        executionGraph.suspend(cause)
+
+        val jobId = executionGraph.getJobID
 
         jobInfo.notifyNonDetachedClients(
           decorateMessage(
             Failure(
-              new JobExecutionException(jobID, "All jobs are cancelled and cleared.", cause))))
-      }(context.dispatcher)
-    }
+              new JobExecutionException(jobId, "All jobs are cancelled and cleared.", cause))))
 
-    currentJobs.clear()
+        removeJob(jobId, false)
+      })
 
     futures.toSeq
   }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java
index 320dc2df52b..b1fa9b4124e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java
@@ -191,7 +191,7 @@ public void testDiscardAllCheckpoints() throws Exception {
 
 	// ---------------------------------------------------------------------------------------------
 
-	protected TestCompletedCheckpoint createCheckpoint(
+	public static TestCompletedCheckpoint createCheckpoint(
 		int id,
 		SharedStateRegistry sharedStateRegistry) throws IOException {
 
@@ -224,7 +224,12 @@ protected void verifyCheckpointRegistered(Collection<OperatorState> operatorStat
 		}
 	}
 
-	protected void verifyCheckpointDiscarded(Collection<OperatorState> operatorStates) {
+	public static void verifyCheckpointDiscarded(TestCompletedCheckpoint completedCheckpoint) {
+		assertTrue(completedCheckpoint.isDiscarded());
+		verifyCheckpointDiscarded(completedCheckpoint.getOperatorStates().values());
+	}
+
+	protected static void verifyCheckpointDiscarded(Collection<OperatorState> operatorStates) {
 		for (OperatorState operatorState : operatorStates) {
 			for (OperatorSubtaskState subtaskState : operatorState.getStates()) {
 				Assert.assertTrue(((TestOperatorSubtaskState)subtaskState).discarded);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreMockitoTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreMockitoTest.java
new file mode 100644
index 00000000000..4e7f2405795
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreMockitoTest.java
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.state.RetrievableStateHandle;
+import org.apache.flink.runtime.zookeeper.RetrievableStateStorageHelper;
+import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore;
+import org.apache.flink.util.TestLogger;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.api.BackgroundCallback;
+import org.apache.curator.framework.api.CuratorEvent;
+import org.apache.curator.framework.api.CuratorEventType;
+import org.apache.curator.framework.api.ErrorListenerPathable;
+import org.apache.curator.utils.EnsurePath;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.concurrent.Executor;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import static org.powermock.api.mockito.PowerMockito.doAnswer;
+import static org.powermock.api.mockito.PowerMockito.doThrow;
+import static org.powermock.api.mockito.PowerMockito.whenNew;
+
+/**
+ * Mockito based tests for the {@link ZooKeeperStateHandleStore}.
+ */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(ZooKeeperCompletedCheckpointStore.class)
+public class ZooKeeperCompletedCheckpointStoreMockitoTest extends TestLogger {
+
+	/**
+	 * Tests that the completed checkpoint store can retrieve all checkpoints stored in ZooKeeper
+	 * and ignores those which cannot be retrieved via their state handles.
+	 *
+	 * <p>We have a timeout in case the ZooKeeper store get's into a deadlock/livelock situation.
+	 */
+	@Test(timeout = 50000)
+	public void testCheckpointRecovery() throws Exception {
+		final JobID jobID = new JobID();
+		final long checkpoint1Id = 1L;
+		final long checkpoint2Id = 2;
+		final List<Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String>> checkpointsInZooKeeper = new ArrayList<>(4);
+
+		final Collection<Long> expectedCheckpointIds = new HashSet<>(2);
+		expectedCheckpointIds.add(1L);
+		expectedCheckpointIds.add(2L);
+
+		final RetrievableStateHandle<CompletedCheckpoint> failingRetrievableStateHandle = mock(RetrievableStateHandle.class);
+		when(failingRetrievableStateHandle.retrieveState()).thenThrow(new IOException("Test exception"));
+
+		final RetrievableStateHandle<CompletedCheckpoint> retrievableStateHandle1 = mock(RetrievableStateHandle.class);
+		when(retrievableStateHandle1.retrieveState()).then(
+			(invocation) -> new CompletedCheckpoint(
+				jobID,
+				checkpoint1Id,
+				1L,
+				1L,
+				new HashMap<>(),
+				null,
+				CheckpointProperties.forStandardCheckpoint(),
+				null, null));
+
+		final RetrievableStateHandle<CompletedCheckpoint> retrievableStateHandle2 = mock(RetrievableStateHandle.class);
+		when(retrievableStateHandle2.retrieveState()).then(
+			(invocation -> new CompletedCheckpoint(
+				jobID,
+				checkpoint2Id,
+				1L,
+				1L,
+				new HashMap<>(),
+				null,
+				CheckpointProperties.forStandardCheckpoint(),
+				null, null)));
+
+		checkpointsInZooKeeper.add(Tuple2.of(retrievableStateHandle1, "/foobar1"));
+		checkpointsInZooKeeper.add(Tuple2.of(failingRetrievableStateHandle, "/failing1"));
+		checkpointsInZooKeeper.add(Tuple2.of(retrievableStateHandle2, "/foobar2"));
+		checkpointsInZooKeeper.add(Tuple2.of(failingRetrievableStateHandle, "/failing2"));
+
+		final CuratorFramework client = mock(CuratorFramework.class, Mockito.RETURNS_DEEP_STUBS);
+		final RetrievableStateStorageHelper<CompletedCheckpoint> storageHelperMock = mock(RetrievableStateStorageHelper.class);
+
+		ZooKeeperStateHandleStore<CompletedCheckpoint> zooKeeperStateHandleStoreMock = spy(new ZooKeeperStateHandleStore<>(client, storageHelperMock));
+		whenNew(ZooKeeperStateHandleStore.class).withAnyArguments().thenReturn(zooKeeperStateHandleStoreMock);
+		doReturn(checkpointsInZooKeeper).when(zooKeeperStateHandleStoreMock).getAllSortedByNameAndLock();
+
+		final int numCheckpointsToRetain = 1;
+
+		// Mocking for the delete operation on the CuratorFramework client
+		// It assures that the callback is executed synchronously
+
+		final EnsurePath ensurePathMock = mock(EnsurePath.class);
+		final CuratorEvent curatorEventMock = mock(CuratorEvent.class);
+		when(curatorEventMock.getType()).thenReturn(CuratorEventType.DELETE);
+		when(curatorEventMock.getResultCode()).thenReturn(0);
+		when(client.newNamespaceAwareEnsurePath(anyString())).thenReturn(ensurePathMock);
+
+		when(
+			client
+				.delete()
+				.inBackground(any(BackgroundCallback.class), any(Executor.class))
+		).thenAnswer(new Answer<ErrorListenerPathable<Void>>() {
+			@Override
+			public ErrorListenerPathable<Void> answer(InvocationOnMock invocation) throws Throwable {
+				final BackgroundCallback callback = (BackgroundCallback) invocation.getArguments()[0];
+
+				ErrorListenerPathable<Void> result = mock(ErrorListenerPathable.class);
+
+				when(result.forPath(anyString())).thenAnswer(new Answer<Void>() {
+					@Override
+					public Void answer(InvocationOnMock invocation) throws Throwable {
+
+						callback.processResult(client, curatorEventMock);
+
+						return null;
+					}
+				});
+
+				return result;
+			}
+		});
+
+		final String checkpointsPath = "foobar";
+		final RetrievableStateStorageHelper<CompletedCheckpoint> stateStorage = mock(RetrievableStateStorageHelper.class);
+
+		ZooKeeperCompletedCheckpointStore zooKeeperCompletedCheckpointStore = new ZooKeeperCompletedCheckpointStore(
+			numCheckpointsToRetain,
+			client,
+			checkpointsPath,
+			stateStorage,
+			Executors.directExecutor());
+
+		zooKeeperCompletedCheckpointStore.recover();
+
+		CompletedCheckpoint latestCompletedCheckpoint = zooKeeperCompletedCheckpointStore.getLatestCheckpoint();
+
+		// check that we return the latest retrievable checkpoint
+		// this should remove the latest checkpoint because it is broken
+		assertEquals(checkpoint2Id, latestCompletedCheckpoint.getCheckpointID());
+
+		// this should remove the second broken checkpoint because we're iterating over all checkpoints
+		List<CompletedCheckpoint> completedCheckpoints = zooKeeperCompletedCheckpointStore.getAllCheckpoints();
+
+		Collection<Long> actualCheckpointIds = new HashSet<>(completedCheckpoints.size());
+
+		for (CompletedCheckpoint completedCheckpoint : completedCheckpoints) {
+			actualCheckpointIds.add(completedCheckpoint.getCheckpointID());
+		}
+
+		assertEquals(expectedCheckpointIds, actualCheckpointIds);
+
+		// check that we did not discard any of the state handles
+		verify(retrievableStateHandle1, never()).discardState();
+		verify(retrievableStateHandle2, never()).discardState();
+
+		// Make sure that we also didn't discard any of the broken handles. Only when checkpoints
+		// are subsumed should they be discarded.
+		verify(failingRetrievableStateHandle, never()).discardState();
+	}
+
+	/**
+	 * Tests that the checkpoint does not exist in the store when we fail to add
+	 * it into the store (i.e., there exists an exception thrown by the method).
+	 */
+	@Test
+	public void testAddCheckpointWithFailedRemove() throws Exception {
+		final CuratorFramework client = mock(CuratorFramework.class, Mockito.RETURNS_DEEP_STUBS);
+		final RetrievableStateStorageHelper<CompletedCheckpoint> storageHelperMock = mock(RetrievableStateStorageHelper.class);
+
+		ZooKeeperStateHandleStore<CompletedCheckpoint> zookeeperStateHandleStoreMock =
+			spy(new ZooKeeperStateHandleStore<>(client, storageHelperMock));
+		whenNew(ZooKeeperStateHandleStore.class).withAnyArguments().thenReturn(zookeeperStateHandleStoreMock);
+
+		doAnswer(new Answer<RetrievableStateHandle<CompletedCheckpoint>>() {
+			@Override
+			public RetrievableStateHandle<CompletedCheckpoint> answer(InvocationOnMock invocationOnMock) throws Throwable {
+				CompletedCheckpoint checkpoint = (CompletedCheckpoint) invocationOnMock.getArguments()[1];
+
+				RetrievableStateHandle<CompletedCheckpoint> retrievableStateHandle = mock(RetrievableStateHandle.class);
+				when(retrievableStateHandle.retrieveState()).thenReturn(checkpoint);
+
+				return retrievableStateHandle;
+			}
+		}).when(zookeeperStateHandleStoreMock).addAndLock(anyString(), any(CompletedCheckpoint.class));
+
+		doThrow(new Exception()).when(zookeeperStateHandleStoreMock).releaseAndTryRemove(anyString());
+
+		final int numCheckpointsToRetain = 1;
+		final String checkpointsPath = "foobar";
+		final RetrievableStateStorageHelper<CompletedCheckpoint> stateSotrage = mock(RetrievableStateStorageHelper.class);
+
+		ZooKeeperCompletedCheckpointStore zooKeeperCompletedCheckpointStore = new ZooKeeperCompletedCheckpointStore(
+			numCheckpointsToRetain,
+			client,
+			checkpointsPath,
+			stateSotrage,
+			Executors.directExecutor());
+
+		for (long i = 0; i <= numCheckpointsToRetain; ++i) {
+			CompletedCheckpoint checkpointToAdd = mock(CompletedCheckpoint.class);
+			doReturn(i).when(checkpointToAdd).getCheckpointID();
+			doReturn(Collections.emptyMap()).when(checkpointToAdd).getOperatorStates();
+
+			try {
+				zooKeeperCompletedCheckpointStore.addCheckpoint(checkpointToAdd);
+
+				// The checkpoint should be in the store if we successfully add it into the store.
+				List<CompletedCheckpoint> addedCheckpoints = zooKeeperCompletedCheckpointStore.getAllCheckpoints();
+				assertTrue(addedCheckpoints.contains(checkpointToAdd));
+			} catch (Exception e) {
+				// The checkpoint should not be in the store if any exception is thrown.
+				List<CompletedCheckpoint> addedCheckpoints = zooKeeperCompletedCheckpointStore.getAllCheckpoints();
+				assertFalse(addedCheckpoints.contains(checkpointToAdd));
+			}
+		}
+	}
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java
index 7156cb5e119..99f21c64213 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java
@@ -18,59 +18,39 @@
 
 package org.apache.flink.runtime.checkpoint;
 
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.state.RetrievableStateHandle;
+import org.apache.flink.runtime.state.SharedStateRegistry;
+import org.apache.flink.runtime.util.ZooKeeperUtils;
 import org.apache.flink.runtime.zookeeper.RetrievableStateStorageHelper;
-import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore;
+import org.apache.flink.runtime.zookeeper.ZooKeeperResource;
 import org.apache.flink.util.TestLogger;
 
 import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.api.BackgroundCallback;
-import org.apache.curator.framework.api.CuratorEvent;
-import org.apache.curator.framework.api.CuratorEventType;
-import org.apache.curator.framework.api.ErrorListenerPathable;
-import org.apache.curator.utils.EnsurePath;
+import org.hamcrest.Matchers;
+import org.junit.ClassRule;
 import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.mockito.Mockito;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
+
+import javax.annotation.Nonnull;
 
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
+import java.io.Serializable;
 import java.util.List;
-import java.util.concurrent.Executor;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyString;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-import static org.powermock.api.mockito.PowerMockito.doAnswer;
-import static org.powermock.api.mockito.PowerMockito.doThrow;
-import static org.powermock.api.mockito.PowerMockito.whenNew;
+import static org.junit.Assert.assertThat;
 
 /**
  * Tests for {@link ZooKeeperCompletedCheckpointStore}.
  */
-@RunWith(PowerMockRunner.class)
-@PrepareForTest(ZooKeeperCompletedCheckpointStore.class)
 public class ZooKeeperCompletedCheckpointStoreTest extends TestLogger {
 
+	@ClassRule
+	public static ZooKeeperResource zooKeeperResource = new ZooKeeperResource();
+
 	@Test
 	public void testPathConversion() {
 		final long checkpointId = 42L;
@@ -80,189 +60,101 @@ public void testPathConversion() {
 		assertEquals(checkpointId, ZooKeeperCompletedCheckpointStore.pathToCheckpointId(path));
 	}
 
+	@Test
+	public void testDiscardingSubsumedCheckpoints() throws Exception {
+		final SharedStateRegistry sharedStateRegistry = new SharedStateRegistry();
+		final Configuration configuration = new Configuration();
+		configuration.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, zooKeeperResource.getConnectString());
+
+		final CuratorFramework client = ZooKeeperUtils.startCuratorFramework(configuration);
+		final ZooKeeperCompletedCheckpointStore checkpointStore = createZooKeeperCheckpointStore(client);
+
+		try {
+			final CompletedCheckpointStoreTest.TestCompletedCheckpoint checkpoint1 = CompletedCheckpointStoreTest.createCheckpoint(0, sharedStateRegistry);
+
+			checkpointStore.addCheckpoint(checkpoint1);
+			assertThat(checkpointStore.getAllCheckpoints(), Matchers.contains(checkpoint1));
+
+			final CompletedCheckpointStoreTest.TestCompletedCheckpoint checkpoint2 = CompletedCheckpointStoreTest.createCheckpoint(1, sharedStateRegistry);
+			checkpointStore.addCheckpoint(checkpoint2);
+			final List<CompletedCheckpoint> allCheckpoints = checkpointStore.getAllCheckpoints();
+			assertThat(allCheckpoints, Matchers.contains(checkpoint2));
+			assertThat(allCheckpoints, Matchers.not(Matchers.contains(checkpoint1)));
+
+			// verify that the subsumed checkpoint is discarded
+			CompletedCheckpointStoreTest.verifyCheckpointDiscarded(checkpoint1);
+		} finally {
+			client.close();
+		}
+	}
+
 	/**
-	 * Tests that the completed checkpoint store can retrieve all checkpoints stored in ZooKeeper
-	 * and ignores those which cannot be retrieved via their state handles.
-	 *
-	 * <p>We have a timeout in case the ZooKeeper store get's into a deadlock/livelock situation.
+	 * Tests that checkpoints are discarded when the completed checkpoint store is shut
+	 * down with a globally terminal state.
 	 */
-	@Test(timeout = 50000)
-	public void testCheckpointRecovery() throws Exception {
-		final JobID jobID = new JobID();
-		final long checkpoint1Id = 1L;
-		final long checkpoint2Id = 2;
-		final List<Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String>> checkpointsInZooKeeper = new ArrayList<>(4);
-
-		final Collection<Long> expectedCheckpointIds = new HashSet<>(2);
-		expectedCheckpointIds.add(1L);
-		expectedCheckpointIds.add(2L);
-
-		final RetrievableStateHandle<CompletedCheckpoint> failingRetrievableStateHandle = mock(RetrievableStateHandle.class);
-		when(failingRetrievableStateHandle.retrieveState()).thenThrow(new IOException("Test exception"));
-
-		final RetrievableStateHandle<CompletedCheckpoint> retrievableStateHandle1 = mock(RetrievableStateHandle.class);
-		when(retrievableStateHandle1.retrieveState()).then(
-			(invocation) -> new CompletedCheckpoint(
-				jobID,
-				checkpoint1Id,
-				1L,
-				1L,
-				new HashMap<>(),
-				null,
-				CheckpointProperties.forStandardCheckpoint(),
-				null, null));
-
-		final RetrievableStateHandle<CompletedCheckpoint> retrievableStateHandle2 = mock(RetrievableStateHandle.class);
-		when(retrievableStateHandle2.retrieveState()).then(
-			(invocation -> new CompletedCheckpoint(
-				jobID,
-				checkpoint2Id,
-				1L,
-				1L,
-				new HashMap<>(),
-				null,
-				CheckpointProperties.forStandardCheckpoint(),
-				null, null)));
-
-		checkpointsInZooKeeper.add(Tuple2.of(retrievableStateHandle1, "/foobar1"));
-		checkpointsInZooKeeper.add(Tuple2.of(failingRetrievableStateHandle, "/failing1"));
-		checkpointsInZooKeeper.add(Tuple2.of(retrievableStateHandle2, "/foobar2"));
-		checkpointsInZooKeeper.add(Tuple2.of(failingRetrievableStateHandle, "/failing2"));
-
-		final CuratorFramework client = mock(CuratorFramework.class, Mockito.RETURNS_DEEP_STUBS);
-		final RetrievableStateStorageHelper<CompletedCheckpoint> storageHelperMock = mock(RetrievableStateStorageHelper.class);
-
-		ZooKeeperStateHandleStore<CompletedCheckpoint> zooKeeperStateHandleStoreMock = spy(new ZooKeeperStateHandleStore<>(client, storageHelperMock, Executors.directExecutor()));
-		whenNew(ZooKeeperStateHandleStore.class).withAnyArguments().thenReturn(zooKeeperStateHandleStoreMock);
-		doReturn(checkpointsInZooKeeper).when(zooKeeperStateHandleStoreMock).getAllSortedByNameAndLock();
-
-		final int numCheckpointsToRetain = 1;
-
-		// Mocking for the delete operation on the CuratorFramework client
-		// It assures that the callback is executed synchronously
-
-		final EnsurePath ensurePathMock = mock(EnsurePath.class);
-		final CuratorEvent curatorEventMock = mock(CuratorEvent.class);
-		when(curatorEventMock.getType()).thenReturn(CuratorEventType.DELETE);
-		when(curatorEventMock.getResultCode()).thenReturn(0);
-		when(client.newNamespaceAwareEnsurePath(anyString())).thenReturn(ensurePathMock);
-
-		when(
-			client
-				.delete()
-				.inBackground(any(BackgroundCallback.class), any(Executor.class))
-		).thenAnswer(new Answer<ErrorListenerPathable<Void>>() {
-			@Override
-			public ErrorListenerPathable<Void> answer(InvocationOnMock invocation) throws Throwable {
-				final BackgroundCallback callback = (BackgroundCallback) invocation.getArguments()[0];
+	@Test
+	public void testDiscardingCheckpointsAtShutDown() throws Exception {
+		final SharedStateRegistry sharedStateRegistry = new SharedStateRegistry();
+		final Configuration configuration = new Configuration();
+		configuration.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, zooKeeperResource.getConnectString());
 
-				ErrorListenerPathable<Void> result = mock(ErrorListenerPathable.class);
+		final CuratorFramework client = ZooKeeperUtils.startCuratorFramework(configuration);
+		final ZooKeeperCompletedCheckpointStore checkpointStore = createZooKeeperCheckpointStore(client);
 
-				when(result.forPath(anyString())).thenAnswer(new Answer<Void>() {
-					@Override
-					public Void answer(InvocationOnMock invocation) throws Throwable {
+		try {
+			final CompletedCheckpointStoreTest.TestCompletedCheckpoint checkpoint1 = CompletedCheckpointStoreTest.createCheckpoint(0, sharedStateRegistry);
 
-						callback.processResult(client, curatorEventMock);
+			checkpointStore.addCheckpoint(checkpoint1);
+			assertThat(checkpointStore.getAllCheckpoints(), Matchers.contains(checkpoint1));
 
-						return null;
-					}
-				});
+			checkpointStore.shutdown(JobStatus.FINISHED);
 
-				return result;
-			}
-		});
-
-		final String checkpointsPath = "foobar";
-		final RetrievableStateStorageHelper<CompletedCheckpoint> stateStorage = mock(RetrievableStateStorageHelper.class);
+			// verify that the checkpoint is discarded
+			CompletedCheckpointStoreTest.verifyCheckpointDiscarded(checkpoint1);
+		} finally {
+			client.close();
+		}
+	}
 
-		ZooKeeperCompletedCheckpointStore zooKeeperCompletedCheckpointStore = new ZooKeeperCompletedCheckpointStore(
-			numCheckpointsToRetain,
+	@Nonnull
+	private ZooKeeperCompletedCheckpointStore createZooKeeperCheckpointStore(CuratorFramework client) throws Exception {
+		return new ZooKeeperCompletedCheckpointStore(
+			1,
 			client,
-			checkpointsPath,
-			stateStorage,
+			"/checkpoints",
+			new TestingRetrievableStateStorageHelper<>(),
 			Executors.directExecutor());
+	}
 
-		zooKeeperCompletedCheckpointStore.recover();
-
-		CompletedCheckpoint latestCompletedCheckpoint = zooKeeperCompletedCheckpointStore.getLatestCheckpoint();
-
-		// check that we return the latest retrievable checkpoint
-		// this should remove the latest checkpoint because it is broken
-		assertEquals(checkpoint2Id, latestCompletedCheckpoint.getCheckpointID());
-
-		// this should remove the second broken checkpoint because we're iterating over all checkpoints
-		List<CompletedCheckpoint> completedCheckpoints = zooKeeperCompletedCheckpointStore.getAllCheckpoints();
-
-		Collection<Long> actualCheckpointIds = new HashSet<>(completedCheckpoints.size());
-
-		for (CompletedCheckpoint completedCheckpoint : completedCheckpoints) {
-			actualCheckpointIds.add(completedCheckpoint.getCheckpointID());
+	private static final class TestingRetrievableStateStorageHelper<T extends Serializable> implements RetrievableStateStorageHelper<T> {
+		@Override
+		public RetrievableStateHandle<T> store(T state) {
+			return new TestingRetrievableStateHandle<>(state);
 		}
 
-		assertEquals(expectedCheckpointIds, actualCheckpointIds);
-
-		// check that we did not discard any of the state handles
-		verify(retrievableStateHandle1, never()).discardState();
-		verify(retrievableStateHandle2, never()).discardState();
+		private static class TestingRetrievableStateHandle<T extends Serializable> implements RetrievableStateHandle<T> {
 
-		// Make sure that we also didn't discard any of the broken handles. Only when checkpoints
-		// are subsumed should they be discarded.
-		verify(failingRetrievableStateHandle, never()).discardState();
-	}
+			private static final long serialVersionUID = 137053380713794300L;
 
-	/**
-	 * Tests that the checkpoint does not exist in the store when we fail to add
-	 * it into the store (i.e., there exists an exception thrown by the method).
-	 */
-	@Test
-	public void testAddCheckpointWithFailedRemove() throws Exception {
-		final CuratorFramework client = mock(CuratorFramework.class, Mockito.RETURNS_DEEP_STUBS);
-		final RetrievableStateStorageHelper<CompletedCheckpoint> storageHelperMock = mock(RetrievableStateStorageHelper.class);
+			private final T state;
 
-		ZooKeeperStateHandleStore<CompletedCheckpoint> zookeeperStateHandleStoreMock =
-			spy(new ZooKeeperStateHandleStore<>(client, storageHelperMock, Executors.directExecutor()));
-		whenNew(ZooKeeperStateHandleStore.class).withAnyArguments().thenReturn(zookeeperStateHandleStoreMock);
+			private TestingRetrievableStateHandle(T state) {
+				this.state = state;
+			}
 
-		doAnswer(new Answer<RetrievableStateHandle<CompletedCheckpoint>>() {
 			@Override
-			public RetrievableStateHandle<CompletedCheckpoint> answer(InvocationOnMock invocationOnMock) throws Throwable {
-				CompletedCheckpoint checkpoint = (CompletedCheckpoint) invocationOnMock.getArguments()[1];
-
-				RetrievableStateHandle<CompletedCheckpoint> retrievableStateHandle = mock(RetrievableStateHandle.class);
-				when(retrievableStateHandle.retrieveState()).thenReturn(checkpoint);
-
-				return retrievableStateHandle;
+			public T retrieveState() throws IOException, ClassNotFoundException {
+				return state;
 			}
-		}).when(zookeeperStateHandleStoreMock).addAndLock(anyString(), any(CompletedCheckpoint.class));
-
-		doThrow(new Exception()).when(zookeeperStateHandleStoreMock).releaseAndTryRemove(anyString(), any(ZooKeeperStateHandleStore.RemoveCallback.class));
 
-		final int numCheckpointsToRetain = 1;
-		final String checkpointsPath = "foobar";
-		final RetrievableStateStorageHelper<CompletedCheckpoint> stateSotrage = mock(RetrievableStateStorageHelper.class);
-
-		ZooKeeperCompletedCheckpointStore zooKeeperCompletedCheckpointStore = new ZooKeeperCompletedCheckpointStore(
-			numCheckpointsToRetain,
-			client,
-			checkpointsPath,
-			stateSotrage,
-			Executors.directExecutor());
+			@Override
+			public void discardState() throws Exception {
+				// no op
+			}
 
-		for (long i = 0; i <= numCheckpointsToRetain; ++i) {
-			CompletedCheckpoint checkpointToAdd = mock(CompletedCheckpoint.class);
-			doReturn(i).when(checkpointToAdd).getCheckpointID();
-			doReturn(Collections.emptyMap()).when(checkpointToAdd).getOperatorStates();
-
-			try {
-				zooKeeperCompletedCheckpointStore.addCheckpoint(checkpointToAdd);
-
-				// The checkpoint should be in the store if we successfully add it into the store.
-				List<CompletedCheckpoint> addedCheckpoints = zooKeeperCompletedCheckpointStore.getAllCheckpoints();
-				assertTrue(addedCheckpoints.contains(checkpointToAdd));
-			} catch (Exception e) {
-				// The checkpoint should not be in the store if any exception is thrown.
-				List<CompletedCheckpoint> addedCheckpoints = zooKeeperCompletedCheckpointStore.getAllCheckpoints();
-				assertFalse(addedCheckpoints.contains(checkpointToAdd));
+			@Override
+			public long getStateSize() {
+				return 0;
 			}
 		}
 	}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
index 88141d6ca71..ea5e01f385d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
@@ -514,6 +514,11 @@ public void removeJobGraph(JobID jobId) throws Exception {
 			storedJobs.remove(jobId);
 		}
 
+		@Override
+		public void releaseJobGraph(JobID jobId) throws Exception {
+			// no op
+		}
+
 		@Override
 		public Collection<JobID> getJobIds() throws Exception {
 			return storedJobs.keySet();
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperHAJobManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperHAJobManagerTest.java
new file mode 100644
index 00000000000..eb58557c0f8
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperHAJobManagerTest.java
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.akka.ListeningBehaviour;
+import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.instance.AkkaActorGateway;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
+import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
+import org.apache.flink.runtime.metrics.MetricRegistryImpl;
+import org.apache.flink.runtime.testingUtils.TestingJobManager;
+import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.testtasks.NoOpInvokable;
+import org.apache.flink.runtime.util.ZooKeeperUtils;
+import org.apache.flink.runtime.zookeeper.ZooKeeperResource;
+import org.apache.flink.util.TestLogger;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.ExtendedActorSystem;
+import akka.actor.Identify;
+import akka.actor.PoisonPill;
+import akka.actor.Terminated;
+import akka.pattern.Patterns;
+import org.apache.curator.framework.CuratorFramework;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.util.Collection;
+import java.util.concurrent.TimeUnit;
+
+import scala.Option;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
+
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.not;
+import static org.hamcrest.Matchers.notNullValue;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for the ZooKeeper HA service and {@link JobManager} interaction.
+ */
+public class ZooKeeperHAJobManagerTest extends TestLogger {
+
+	@ClassRule
+	public static final ZooKeeperResource ZOO_KEEPER_RESOURCE = new ZooKeeperResource();
+
+	@ClassRule
+	public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();
+
+	private static final FiniteDuration TIMEOUT = FiniteDuration.apply(10L, TimeUnit.SECONDS);
+
+	private static ActorSystem system;
+
+	@BeforeClass
+	public static void setup() {
+		system = AkkaUtils.createLocalActorSystem(new Configuration());
+	}
+
+	@AfterClass
+	public static void teardown() throws Exception {
+		final Future<Terminated> terminationFuture = system.terminate();
+		Await.ready(terminationFuture, TIMEOUT);
+	}
+
+	/**
+	 * Tests that the {@link JobManager} releases all locked {@link JobGraph} if it loses
+	 * leadership.
+	 */
+	@Test
+	public void testJobGraphReleaseWhenLosingLeadership() throws Exception {
+		final Configuration configuration = new Configuration();
+		configuration.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, ZOO_KEEPER_RESOURCE.getConnectString());
+		configuration.setString(HighAvailabilityOptions.HA_STORAGE_PATH, TEMPORARY_FOLDER.newFolder().getAbsolutePath());
+
+		final TestingHighAvailabilityServices highAvailabilityServices = new TestingHighAvailabilityServices();
+
+		final CuratorFramework client = ZooKeeperUtils.startCuratorFramework(configuration);
+		final TestingLeaderElectionService leaderElectionService = new TestingLeaderElectionService();
+		highAvailabilityServices.setJobMasterLeaderElectionService(HighAvailabilityServices.DEFAULT_JOB_ID, leaderElectionService);
+		highAvailabilityServices.setSubmittedJobGraphStore(ZooKeeperUtils.createSubmittedJobGraphs(client, configuration));
+		highAvailabilityServices.setCheckpointRecoveryFactory(new StandaloneCheckpointRecoveryFactory());
+
+		final CuratorFramework otherClient = ZooKeeperUtils.startCuratorFramework(configuration);
+		final ZooKeeperSubmittedJobGraphStore otherSubmittedJobGraphStore = ZooKeeperUtils.createSubmittedJobGraphs(otherClient, configuration);
+		otherSubmittedJobGraphStore.start(NoOpSubmittedJobGraphListener.INSTANCE);
+
+		ActorRef jobManagerActorRef = null;
+		try {
+			jobManagerActorRef = JobManager.startJobManagerActors(
+				configuration,
+				system,
+				TestingUtils.defaultExecutor(),
+				TestingUtils.defaultExecutor(),
+				highAvailabilityServices,
+				new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(configuration)),
+				Option.empty(),
+				TestingJobManager.class,
+				MemoryArchivist.class)._1();
+
+			waitForActorToBeStarted(jobManagerActorRef, TIMEOUT);
+
+			final ActorGateway jobManager = new AkkaActorGateway(jobManagerActorRef, HighAvailabilityServices.DEFAULT_LEADER_ID);
+
+			leaderElectionService.isLeader(HighAvailabilityServices.DEFAULT_LEADER_ID).get();
+
+			final JobGraph nonEmptyJobGraph = createNonEmptyJobGraph();
+
+			final JobManagerMessages.SubmitJob submitJobMessage = new JobManagerMessages.SubmitJob(nonEmptyJobGraph, ListeningBehaviour.DETACHED);
+
+			Await.result(jobManager.ask(submitJobMessage, TIMEOUT), TIMEOUT);
+
+			Collection<JobID> jobIds = otherSubmittedJobGraphStore.getJobIds();
+
+			final JobID jobId = nonEmptyJobGraph.getJobID();
+			assertThat(jobIds, contains(jobId));
+
+			// revoke the leadership
+			leaderElectionService.notLeader();
+
+			Await.result(jobManager.ask(TestingJobManagerMessages.getWaitForBackgroundTasksToFinish(), TIMEOUT), TIMEOUT);
+
+			final SubmittedJobGraph recoveredJobGraph = akka.serialization.JavaSerializer.currentSystem().withValue(
+				((ExtendedActorSystem) system),
+				() -> otherSubmittedJobGraphStore.recoverJobGraph(jobId));
+
+			assertThat(recoveredJobGraph, is(notNullValue()));
+
+			otherSubmittedJobGraphStore.removeJobGraph(jobId);
+
+			jobIds = otherSubmittedJobGraphStore.getJobIds();
+
+			assertThat(jobIds, not(contains(jobId)));
+		} finally {
+			client.close();
+			otherClient.close();
+
+			if (jobManagerActorRef != null) {
+				jobManagerActorRef.tell(PoisonPill.getInstance(), ActorRef.noSender());
+			}
+		}
+	}
+
+	private JobGraph createNonEmptyJobGraph() {
+		final JobVertex noOpVertex = new JobVertex("NoOp vertex");
+		noOpVertex.setInvokableClass(NoOpInvokable.class);
+		final JobGraph jobGraph = new JobGraph(noOpVertex);
+		jobGraph.setAllowQueuedScheduling(true);
+
+		return jobGraph;
+	}
+
+	private void waitForActorToBeStarted(ActorRef jobManagerActorRef, FiniteDuration timeout) throws InterruptedException, java.util.concurrent.TimeoutException {
+		Await.ready(Patterns.ask(jobManagerActorRef, new Identify(42), timeout.toMillis()), timeout);
+	}
+
+	enum NoOpSubmittedJobGraphListener implements SubmittedJobGraphStore.SubmittedJobGraphListener {
+		INSTANCE;
+
+		@Override
+		public void onAddedJobGraph(JobID jobId) {
+			// no op
+		}
+
+		@Override
+		public void onRemovedJobGraph(JobID jobId) {
+			// no op
+		}
+	}
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphsStoreITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphsStoreITCase.java
index 9454d90e05c..d89894aff27 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphsStoreITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphsStoreITCase.java
@@ -21,7 +21,6 @@
 import akka.actor.ActorRef;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.akka.ListeningBehaviour;
-import org.apache.flink.runtime.concurrent.Executors;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore.SubmittedJobGraphListener;
@@ -90,8 +89,7 @@ public void testPutAndRemoveJobGraph() throws Exception {
 		ZooKeeperSubmittedJobGraphStore jobGraphs = new ZooKeeperSubmittedJobGraphStore(
 			ZooKeeper.createClient(),
 			"/testPutAndRemoveJobGraph",
-			localStateStorage,
-			Executors.directExecutor());
+			localStateStorage);
 
 		try {
 			SubmittedJobGraphListener listener = mock(SubmittedJobGraphListener.class);
@@ -147,7 +145,7 @@ public void testPutAndRemoveJobGraph() throws Exception {
 	@Test
 	public void testRecoverJobGraphs() throws Exception {
 		ZooKeeperSubmittedJobGraphStore jobGraphs = new ZooKeeperSubmittedJobGraphStore(
-				ZooKeeper.createClient(), "/testRecoverJobGraphs", localStateStorage, Executors.directExecutor());
+				ZooKeeper.createClient(), "/testRecoverJobGraphs", localStateStorage);
 
 		try {
 			SubmittedJobGraphListener listener = mock(SubmittedJobGraphListener.class);
@@ -198,10 +196,10 @@ public void testConcurrentAddJobGraph() throws Exception {
 
 		try {
 			jobGraphs = new ZooKeeperSubmittedJobGraphStore(
-					ZooKeeper.createClient(), "/testConcurrentAddJobGraph", localStateStorage, Executors.directExecutor());
+					ZooKeeper.createClient(), "/testConcurrentAddJobGraph", localStateStorage);
 
 			otherJobGraphs = new ZooKeeperSubmittedJobGraphStore(
-					ZooKeeper.createClient(), "/testConcurrentAddJobGraph", localStateStorage, Executors.directExecutor());
+					ZooKeeper.createClient(), "/testConcurrentAddJobGraph", localStateStorage);
 
 
 			SubmittedJobGraph jobGraph = createSubmittedJobGraph(new JobID(), 0);
@@ -257,10 +255,10 @@ public Void answer(InvocationOnMock invocation) throws Throwable {
 	@Test(expected = IllegalStateException.class)
 	public void testUpdateJobGraphYouDidNotGetOrAdd() throws Exception {
 		ZooKeeperSubmittedJobGraphStore jobGraphs = new ZooKeeperSubmittedJobGraphStore(
-				ZooKeeper.createClient(), "/testUpdateJobGraphYouDidNotGetOrAdd", localStateStorage, Executors.directExecutor());
+				ZooKeeper.createClient(), "/testUpdateJobGraphYouDidNotGetOrAdd", localStateStorage);
 
 		ZooKeeperSubmittedJobGraphStore otherJobGraphs = new ZooKeeperSubmittedJobGraphStore(
-				ZooKeeper.createClient(), "/testUpdateJobGraphYouDidNotGetOrAdd", localStateStorage, Executors.directExecutor());
+				ZooKeeper.createClient(), "/testUpdateJobGraphYouDidNotGetOrAdd", localStateStorage);
 
 		jobGraphs.start(null);
 		otherJobGraphs.start(null);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperResource.java b/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperResource.java
new file mode 100644
index 00000000000..c4c56949cd9
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperResource.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.zookeeper;
+
+import org.apache.flink.util.Preconditions;
+
+import org.apache.curator.test.TestingServer;
+import org.junit.rules.ExternalResource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+
+/**
+ * {@link ExternalResource} which starts a {@link org.apache.zookeeper.server.ZooKeeperServer}.
+ */
+public class ZooKeeperResource extends ExternalResource {
+
+	private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperResource.class);
+
+	@Nullable
+	private TestingServer zooKeeperServer;
+
+	public String getConnectString() {
+		verifyIsRunning();
+		return zooKeeperServer.getConnectString();
+	}
+
+	private void verifyIsRunning() {
+		Preconditions.checkState(zooKeeperServer != null);
+	}
+
+	@Override
+	protected void before() throws Throwable {
+		terminateZooKeeperServer();
+		zooKeeperServer = new TestingServer(true);
+	}
+
+	private void terminateZooKeeperServer() throws IOException {
+		if (zooKeeperServer != null) {
+			zooKeeperServer.stop();
+			zooKeeperServer = null;
+		}
+	}
+
+	@Override
+	protected void after() {
+		try {
+			terminateZooKeeperServer();
+		} catch (IOException e) {
+			LOG.warn("Could not properly terminate the {}.", getClass().getSimpleName(), e);
+		}
+	}
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreTest.java
index fd39b25991c..2dd27e7c897 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreTest.java
@@ -18,21 +18,19 @@
 
 package org.apache.flink.runtime.zookeeper;
 
-import org.apache.curator.framework.CuratorFramework;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.HighAvailabilityOptions;
-import org.apache.flink.runtime.concurrent.Executors;
 import org.apache.flink.runtime.state.RetrievableStateHandle;
 import org.apache.flink.runtime.util.ZooKeeperUtils;
 import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.TestLogger;
+
+import org.apache.curator.framework.CuratorFramework;
 import org.apache.zookeeper.data.Stat;
 import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.Test;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -41,7 +39,6 @@
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
-import java.util.concurrent.CountDownLatch;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotEquals;
@@ -49,12 +46,7 @@
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 /**
@@ -88,8 +80,8 @@ public void cleanUp() throws Exception {
 	@Test
 	public void testAddAndLock() throws Exception {
 		LongStateStorage longStateStorage = new LongStateStorage();
-		ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<Long>(
-				ZOOKEEPER.getClient(), longStateStorage, Executors.directExecutor());
+		ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
+			ZOOKEEPER.getClient(), longStateStorage);
 
 		// Config
 		final String pathInZooKeeper = "/testAdd";
@@ -136,7 +128,7 @@ public void testAddAlreadyExistingPath() throws Exception {
 		LongStateStorage stateHandleProvider = new LongStateStorage();
 
 		ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
-				ZOOKEEPER.getClient(), stateHandleProvider, Executors.directExecutor());
+				ZOOKEEPER.getClient(), stateHandleProvider);
 
 		ZOOKEEPER.getClient().create().forPath("/testAddAlreadyExistingPath");
 
@@ -161,7 +153,7 @@ public void testAddDiscardStateHandleAfterFailure() throws Exception {
 		when(client.inTransaction().create()).thenThrow(new RuntimeException("Expected test Exception."));
 
 		ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
-				client, stateHandleProvider, Executors.directExecutor());
+				client, stateHandleProvider);
 
 		// Config
 		final String pathInZooKeeper = "/testAddDiscardStateHandleAfterFailure";
@@ -191,7 +183,7 @@ public void testReplace() throws Exception {
 		LongStateStorage stateHandleProvider = new LongStateStorage();
 
 		ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
-				ZOOKEEPER.getClient(), stateHandleProvider, Executors.directExecutor());
+				ZOOKEEPER.getClient(), stateHandleProvider);
 
 		// Config
 		final String pathInZooKeeper = "/testReplace";
@@ -230,7 +222,7 @@ public void testReplaceNonExistingPath() throws Exception {
 		RetrievableStateStorageHelper<Long> stateStorage = new LongStateStorage();
 
 		ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
-				ZOOKEEPER.getClient(), stateStorage, Executors.directExecutor());
+				ZOOKEEPER.getClient(), stateStorage);
 
 		store.replace("/testReplaceNonExistingPath", 0, 1L);
 	}
@@ -247,7 +239,7 @@ public void testReplaceDiscardStateHandleAfterFailure() throws Exception {
 		when(client.setData()).thenThrow(new RuntimeException("Expected test Exception."));
 
 		ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
-				client, stateHandleProvider, Executors.directExecutor());
+				client, stateHandleProvider);
 
 		// Config
 		final String pathInZooKeeper = "/testReplaceDiscardStateHandleAfterFailure";
@@ -289,7 +281,7 @@ public void testGetAndExists() throws Exception {
 		LongStateStorage stateHandleProvider = new LongStateStorage();
 
 		ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
-				ZOOKEEPER.getClient(), stateHandleProvider, Executors.directExecutor());
+				ZOOKEEPER.getClient(), stateHandleProvider);
 
 		// Config
 		final String pathInZooKeeper = "/testGetAndExists";
@@ -314,7 +306,7 @@ public void testGetNonExistingPath() throws Exception {
 		LongStateStorage stateHandleProvider = new LongStateStorage();
 
 		ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
-				ZOOKEEPER.getClient(), stateHandleProvider, Executors.directExecutor());
+				ZOOKEEPER.getClient(), stateHandleProvider);
 
 		store.getAndLock("/testGetNonExistingPath");
 	}
@@ -328,7 +320,7 @@ public void testGetAll() throws Exception {
 		LongStateStorage stateHandleProvider = new LongStateStorage();
 
 		ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
-				ZOOKEEPER.getClient(), stateHandleProvider, Executors.directExecutor());
+				ZOOKEEPER.getClient(), stateHandleProvider);
 
 		// Config
 		final String pathInZooKeeper = "/testGetAll";
@@ -359,7 +351,7 @@ public void testGetAllSortedByName() throws Exception {
 		LongStateStorage stateHandleProvider = new LongStateStorage();
 
 		ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
-				ZOOKEEPER.getClient(), stateHandleProvider, Executors.directExecutor());
+				ZOOKEEPER.getClient(), stateHandleProvider);
 
 		// Config
 		final String basePath = "/testGetAllSortedByName";
@@ -393,7 +385,7 @@ public void testRemove() throws Exception {
 		LongStateStorage stateHandleProvider = new LongStateStorage();
 
 		ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
-				ZOOKEEPER.getClient(), stateHandleProvider, Executors.directExecutor());
+				ZOOKEEPER.getClient(), stateHandleProvider);
 
 		// Config
 		final String pathInZooKeeper = "/testRemove";
@@ -401,50 +393,14 @@ public void testRemove() throws Exception {
 
 		store.addAndLock(pathInZooKeeper, state);
 
+		final int numberOfGlobalDiscardCalls = LongRetrievableStateHandle.getNumberOfGlobalDiscardCalls();
+
 		// Test
 		store.releaseAndTryRemove(pathInZooKeeper);
 
 		// Verify discarded
 		assertEquals(0, ZOOKEEPER.getClient().getChildren().forPath("/").size());
-	}
-
-	/**
-	 * Tests that state handles are correctly removed with a callback.
-	 */
-	@Test
-	public void testRemoveWithCallback() throws Exception {
-		// Setup
-		LongStateStorage stateHandleProvider = new LongStateStorage();
-
-		ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
-				ZOOKEEPER.getClient(), stateHandleProvider, Executors.directExecutor());
-
-		// Config
-		final String pathInZooKeeper = "/testRemoveWithCallback";
-		final Long state = 27255442L;
-
-		store.addAndLock(pathInZooKeeper, state);
-
-		final CountDownLatch sync = new CountDownLatch(1);
-		ZooKeeperStateHandleStore.RemoveCallback<Long> callback = mock(ZooKeeperStateHandleStore.RemoveCallback.class);
-		doAnswer(new Answer<Void>() {
-			@Override
-			public Void answer(InvocationOnMock invocation) throws Throwable {
-				sync.countDown();
-				return null;
-			}
-		}).when(callback).apply(any(RetrievableStateHandle.class));
-
-		// Test
-		store.releaseAndTryRemove(pathInZooKeeper, callback);
-
-		// Verify discarded and callback called
-		assertEquals(0, ZOOKEEPER.getClient().getChildren().forPath("/").size());
-
-		sync.await();
-
-		verify(callback, times(1))
-				.apply(any(RetrievableStateHandle.class));
+		assertEquals(numberOfGlobalDiscardCalls + 1, LongRetrievableStateHandle.getNumberOfGlobalDiscardCalls());
 	}
 
 	/** Tests that all state handles are correctly discarded. */
@@ -454,7 +410,7 @@ public void testReleaseAndTryRemoveAll() throws Exception {
 		LongStateStorage stateHandleProvider = new LongStateStorage();
 
 		ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
-				ZOOKEEPER.getClient(), stateHandleProvider, Executors.directExecutor());
+				ZOOKEEPER.getClient(), stateHandleProvider);
 
 		// Config
 		final String pathInZooKeeper = "/testDiscardAll";
@@ -486,8 +442,7 @@ public void testCorruptedData() throws Exception {
 
 		ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
 			ZOOKEEPER.getClient(),
-			stateStorage,
-			Executors.directExecutor());
+			stateStorage);
 
 		final Collection<Long> input = new HashSet<>();
 		input.add(1L);
@@ -543,13 +498,11 @@ public void testConcurrentDeleteOperation() throws Exception {
 
 		ZooKeeperStateHandleStore<Long> zkStore1 = new ZooKeeperStateHandleStore<>(
 			ZOOKEEPER.getClient(),
-			longStateStorage,
-			Executors.directExecutor());
+			longStateStorage);
 
 		ZooKeeperStateHandleStore<Long> zkStore2 = new ZooKeeperStateHandleStore<>(
 			ZOOKEEPER.getClient(),
-			longStateStorage,
-			Executors.directExecutor());
+			longStateStorage);
 
 		final String statePath = "/state";
 
@@ -586,13 +539,11 @@ public void testLockCleanupWhenGetAndLockFails() throws Exception {
 
 		ZooKeeperStateHandleStore<Long> zkStore1 = new ZooKeeperStateHandleStore<>(
 			ZOOKEEPER.getClient(),
-			longStateStorage,
-			Executors.directExecutor());
+			longStateStorage);
 
 		ZooKeeperStateHandleStore<Long> zkStore2 = new ZooKeeperStateHandleStore<>(
 			ZOOKEEPER.getClient(),
-			longStateStorage,
-			Executors.directExecutor());
+			longStateStorage);
 
 		final String path = "/state";
 
@@ -649,8 +600,7 @@ public void testLockCleanupWhenClientTimesOut() throws Exception {
 
 			ZooKeeperStateHandleStore<Long> zkStore = new ZooKeeperStateHandleStore<>(
 				client,
-				longStateStorage,
-				Executors.directExecutor());
+				longStateStorage);
 
 			final String path = "/state";
 
@@ -682,8 +632,7 @@ public void testRelease() throws Exception {
 
 		ZooKeeperStateHandleStore<Long> zkStore = new ZooKeeperStateHandleStore<>(
 			ZOOKEEPER.getClient(),
-			longStateStorage,
-			Executors.directExecutor());
+			longStateStorage);
 
 		final String path = "/state";
 
@@ -720,8 +669,7 @@ public void testReleaseAll() throws Exception {
 
 		ZooKeeperStateHandleStore<Long> zkStore = new ZooKeeperStateHandleStore<>(
 			ZOOKEEPER.getClient(),
-			longStateStorage,
-			Executors.directExecutor());
+			longStateStorage);
 
 		final Collection<String> paths = Arrays.asList("/state1", "/state2", "/state3");
 
@@ -775,9 +723,11 @@ public void testReleaseAll() throws Exception {
 
 		private static final long serialVersionUID = -3555329254423838912L;
 
+		private static int numberOfGlobalDiscardCalls = 0;
+
 		private final Long state;
 
-		private int numberOfDiscardCalls;
+		private int numberOfDiscardCalls = 0;
 
 		public LongRetrievableStateHandle(Long state) {
 			this.state = state;
@@ -790,6 +740,7 @@ public Long retrieveState() {
 
 		@Override
 		public void discardState() throws Exception {
+			numberOfGlobalDiscardCalls++;
 			numberOfDiscardCalls++;
 		}
 
@@ -798,8 +749,12 @@ public long getStateSize() {
 			return 0;
 		}
 
-		public int getNumberOfDiscardCalls() {
+		int getNumberOfDiscardCalls() {
 			return numberOfDiscardCalls;
 		}
+
+		public static int getNumberOfGlobalDiscardCalls() {
+			return numberOfGlobalDiscardCalls;
+		}
 	}
 }
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala
index cd88133a0bd..2609dc3b7fe 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala
@@ -440,6 +440,14 @@ trait TestingJobManagerLike extends FlinkActor {
         val receiver = waitForNumRegisteredTaskManagers.dequeue()._2
         receiver ! Acknowledge.get()
       }
+
+    case WaitForBackgroundTasksToFinish =>
+      val future = futuresToComplete match {
+        case Some(futures) => Future.sequence(futures)
+        case None => Future.successful(Seq())
+      }
+
+      future.pipeTo(sender())
   }
 
   def checkIfAllVerticesRunning(jobID: JobID): Boolean = {
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala
index f79c1249bc7..4decfcfc76c 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala
@@ -58,6 +58,8 @@ object TestingJobManagerMessages {
 
   case object NotifyListeners
 
+  case object WaitForBackgroundTasksToFinish
+
   case class NotifyWhenTaskManagerTerminated(taskManager: ActorRef)
   case class TaskManagerTerminated(taskManager: ActorRef)
 
@@ -164,4 +166,5 @@ object TestingJobManagerMessages {
   def getClientConnected(): AnyRef = ClientConnected
   def getClassLoadingPropsDelivered(): AnyRef = ClassLoadingPropsDelivered
 
+  def getWaitForBackgroundTasksToFinish(): AnyRef = WaitForBackgroundTasksToFinish
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services