You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2018/09/12 14:46:02 UTC

[jira] [Commented] (FLINK-10185) Make ZooKeeperStateHandleStore#releaseAndTryRemove synchronous

    [ https://issues.apache.org/jira/browse/FLINK-10185?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16612244#comment-16612244 ] 

ASF GitHub Bot commented on FLINK-10185:
----------------------------------------

asfgit closed pull request #6586: [FLINK-10185] Make ZooKeeperStateHandleStore#releaseAndTryRemove synchronous
URL: https://github.com/apache/flink/pull/6586
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/ZooKeeperMesosServices.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/ZooKeeperMesosServices.java
index 069cb833a3a..45d11412c50 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/ZooKeeperMesosServices.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/ZooKeeperMesosServices.java
@@ -56,8 +56,7 @@ public MesosWorkerStore createMesosWorkerStore(Configuration configuration, Exec
 
 		ZooKeeperStateHandleStore<MesosWorkerStore.Worker> zooKeeperStateHandleStore = zooKeeperUtilityFactory.createZooKeeperStateHandleStore(
 			"/workers",
-			stateStorageHelper,
-			executor);
+			stateStorageHelper);
 
 		ZooKeeperSharedValue frameworkId = zooKeeperUtilityFactory.createSharedValue("/frameworkId", new byte[0]);
 		ZooKeeperSharedCount totalTaskCount = zooKeeperUtilityFactory.createSharedCount("/taskCount", 0);
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
index f22127041d3..533026041f8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
@@ -31,8 +31,6 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.annotation.Nullable;
-
 import java.io.IOException;
 import java.util.ArrayDeque;
 import java.util.ArrayList;
@@ -86,6 +84,8 @@
 	 */
 	private final ArrayDeque<CompletedCheckpoint> completedCheckpoints;
 
+	private final Executor executor;
+
 	/**
 	 * Creates a {@link ZooKeeperCompletedCheckpointStore} instance.
 	 *
@@ -98,7 +98,7 @@
 	 *                                       start with a '/')
 	 * @param stateStorage                   State storage to be used to persist the completed
 	 *                                       checkpoint
-	 * @param executor to give to the ZooKeeperStateHandleStore to run ZooKeeper callbacks
+	 * @param executor to execute blocking calls
 	 * @throws Exception
 	 */
 	public ZooKeeperCompletedCheckpointStore(
@@ -123,10 +123,12 @@ public ZooKeeperCompletedCheckpointStore(
 		// All operations will have the path as root
 		this.client = client.usingNamespace(client.getNamespace() + checkpointsPath);
 
-		this.checkpointsInZooKeeper = new ZooKeeperStateHandleStore<>(this.client, stateStorage, executor);
+		this.checkpointsInZooKeeper = new ZooKeeperStateHandleStore<>(this.client, stateStorage);
 
 		this.completedCheckpoints = new ArrayDeque<>(maxNumberOfCheckpointsToRetain + 1);
 
+		this.executor = checkNotNull(executor);
+
 		LOG.info("Initialized in '{}'.", checkpointsPath);
 	}
 
@@ -237,7 +239,18 @@ public void addCheckpoint(final CompletedCheckpoint checkpoint) throws Exception
 		// Everything worked, let's remove a previous checkpoint if necessary.
 		while (completedCheckpoints.size() > maxNumberOfCheckpointsToRetain) {
 			try {
-				removeSubsumed(completedCheckpoints.removeFirst());
+				final CompletedCheckpoint completedCheckpoint = completedCheckpoints.removeFirst();
+
+				if (tryRemove(completedCheckpoint.getCheckpointID())) {
+					executor.execute(() -> {
+						try {
+							completedCheckpoint.discardOnSubsume();
+						} catch (Exception e) {
+							LOG.warn("Could not discard subsumed completed checkpoint {}.", completedCheckpoint.getCheckpointID(), e);
+						}
+					});
+
+				}
 			} catch (Exception e) {
 				LOG.warn("Failed to subsume the old checkpoint", e);
 			}
@@ -279,7 +292,17 @@ public void shutdown(JobStatus jobStatus) throws Exception {
 
 			for (CompletedCheckpoint checkpoint : completedCheckpoints) {
 				try {
-					removeShutdown(checkpoint, jobStatus);
+					if (tryRemove(checkpoint.getCheckpointID())) {
+						executor.execute(
+							() -> {
+								try {
+									checkpoint.discardOnShutdown(jobStatus);
+								} catch (Exception e) {
+									LOG.warn("Could not discard completed checkpoint {} on shutdown.", checkpoint.getCheckpointID(), e);
+								}
+							}
+						);
+					}
 				} catch (Exception e) {
 					LOG.error("Failed to discard checkpoint.", e);
 				}
@@ -305,59 +328,13 @@ public void shutdown(JobStatus jobStatus) throws Exception {
 	// ------------------------------------------------------------------------
 
 	/**
-	 * Removes a subsumed checkpoint from ZooKeeper and drops the state.
-	 */
-	private void removeSubsumed(
-		final CompletedCheckpoint completedCheckpoint) throws Exception {
-
-		if (completedCheckpoint == null) {
-			return;
-		}
-
-		ZooKeeperStateHandleStore.RemoveCallback<CompletedCheckpoint> action =
-			new ZooKeeperStateHandleStore.RemoveCallback<CompletedCheckpoint>() {
-				@Override
-				public void apply(@Nullable RetrievableStateHandle<CompletedCheckpoint> value) throws FlinkException {
-					if (value != null) {
-						try {
-							completedCheckpoint.discardOnSubsume();
-						} catch (Exception e) {
-							throw new FlinkException("Could not discard the completed checkpoint on subsume.", e);
-						}
-					}
-				}
-			};
-
-		checkpointsInZooKeeper.releaseAndTryRemove(
-			checkpointIdToPath(completedCheckpoint.getCheckpointID()),
-			action);
-	}
-
-	/**
-	 * Removes a checkpoint from ZooKeeper because of Job shutdown and drops the state.
+	 * Tries to remove the checkpoint identified by the given checkpoint id.
+	 *
+	 * @param checkpointId identifying the checkpoint to remove
+	 * @return true if the checkpoint could be removed
 	 */
-	private void removeShutdown(
-			final CompletedCheckpoint completedCheckpoint,
-			final JobStatus jobStatus) throws Exception {
-
-		if (completedCheckpoint == null) {
-			return;
-		}
-
-		ZooKeeperStateHandleStore.RemoveCallback<CompletedCheckpoint> removeAction = new ZooKeeperStateHandleStore.RemoveCallback<CompletedCheckpoint>() {
-			@Override
-			public void apply(@Nullable RetrievableStateHandle<CompletedCheckpoint> value) throws FlinkException {
-				try {
-					completedCheckpoint.discardOnShutdown(jobStatus);
-				} catch (Exception e) {
-					throw new FlinkException("Could not discard the completed checkpoint on subsume.", e);
-				}
-			}
-		};
-
-		checkpointsInZooKeeper.releaseAndTryRemove(
-			checkpointIdToPath(completedCheckpoint.getCheckpointID()),
-			removeAction);
+	private boolean tryRemove(long checkpointId) throws Exception {
+		return checkpointsInZooKeeper.releaseAndTryRemove(checkpointIdToPath(checkpointId));
 	}
 
 	/**
@@ -381,7 +358,7 @@ public static long pathToCheckpointId(String path) {
 			String numberString;
 
 			// check if we have a leading slash
-			if ('/' == path.charAt(0) ) {
+			if ('/' == path.charAt(0)) {
 				numberString = path.substring(1);
 			} else {
 				numberString = path;
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServices.java
index 3882479ce95..ea96d7d43d2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServices.java
@@ -179,7 +179,7 @@ public CheckpointRecoveryFactory getCheckpointRecoveryFactory() {
 
 	@Override
 	public SubmittedJobGraphStore getSubmittedJobGraphStore() throws Exception {
-		return ZooKeeperUtils.createSubmittedJobGraphs(client, configuration, executor);
+		return ZooKeeperUtils.createSubmittedJobGraphs(client, configuration);
 	}
 
 	@Override
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
index 7ba5d481177..0510815c686 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
@@ -41,7 +41,6 @@
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
-import java.util.concurrent.Executor;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.apache.flink.util.Preconditions.checkState;
@@ -99,14 +98,12 @@
 	 * @param client ZooKeeper client
 	 * @param currentJobsPath ZooKeeper path for current job graphs
 	 * @param stateStorage State storage used to persist the submitted jobs
-	 * @param executor to give to the ZooKeeperStateHandleStore to run ZooKeeper callbacks
 	 * @throws Exception
 	 */
 	public ZooKeeperSubmittedJobGraphStore(
 			CuratorFramework client,
 			String currentJobsPath,
-			RetrievableStateStorageHelper<SubmittedJobGraph> stateStorage,
-			Executor executor) throws Exception {
+			RetrievableStateStorageHelper<SubmittedJobGraph> stateStorage) throws Exception {
 
 		checkNotNull(currentJobsPath, "Current jobs path");
 		checkNotNull(stateStorage, "State storage");
@@ -123,7 +120,7 @@ public ZooKeeperSubmittedJobGraphStore(
 		CuratorFramework facade = client.usingNamespace(client.getNamespace() + currentJobsPath);
 
 		this.zooKeeperFullBasePath = client.getNamespace() + currentJobsPath;
-		this.jobGraphsInZooKeeper = new ZooKeeperStateHandleStore<>(facade, stateStorage, executor);
+		this.jobGraphsInZooKeeper = new ZooKeeperStateHandleStore<>(facade, stateStorage);
 
 		this.pathCache = new PathChildrenCache(facade, "/", false);
 		pathCache.getListenable().addListener(new SubmittedJobGraphsPathCacheListener());
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
index 43c930e6fea..d9c91610712 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
@@ -227,14 +227,12 @@ public static ZooKeeperLeaderElectionService createLeaderElectionService(
 	 *
 	 * @param client        The {@link CuratorFramework} ZooKeeper client to use
 	 * @param configuration {@link Configuration} object
-	 * @param executor to run ZooKeeper callbacks
 	 * @return {@link ZooKeeperSubmittedJobGraphStore} instance
 	 * @throws Exception if the submitted job graph store cannot be created
 	 */
 	public static ZooKeeperSubmittedJobGraphStore createSubmittedJobGraphs(
 			CuratorFramework client,
-			Configuration configuration,
-			Executor executor) throws Exception {
+			Configuration configuration) throws Exception {
 
 		checkNotNull(configuration, "Configuration");
 
@@ -244,7 +242,9 @@ public static ZooKeeperSubmittedJobGraphStore createSubmittedJobGraphs(
 		String zooKeeperSubmittedJobsPath = configuration.getString(HighAvailabilityOptions.HA_ZOOKEEPER_JOBGRAPHS_PATH);
 
 		return new ZooKeeperSubmittedJobGraphStore(
-				client, zooKeeperSubmittedJobsPath, stateStorage, executor);
+			client,
+			zooKeeperSubmittedJobsPath,
+			stateStorage);
 	}
 
 	/**
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
index 87a433adace..8c3d31fc51b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
@@ -18,17 +18,13 @@
 
 package org.apache.flink.runtime.zookeeper;
 
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.api.BackgroundCallback;
-import org.apache.curator.framework.api.CuratorEvent;
-import org.apache.curator.framework.api.CuratorEventType;
-import org.apache.curator.utils.ZKPaths;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.state.RetrievableStateHandle;
 import org.apache.flink.util.ExceptionUtils;
-import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.InstantiationUtil;
-import org.apache.flink.util.Preconditions;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.utils.ZKPaths;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.data.Stat;
@@ -36,6 +32,7 @@
 import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
+
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
@@ -44,7 +41,6 @@
 import java.util.ConcurrentModificationException;
 import java.util.List;
 import java.util.UUID;
-import java.util.concurrent.Executor;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -68,13 +64,13 @@
  * State handle in ZooKeeper =&gt; State handle exists
  * </pre>
  *
- * But not:
+ * <p>But not:
  *
  * <pre>
  * State handle exists =&gt; State handle in ZooKeeper
  * </pre>
  *
- * There can be lingering state handles when failures happen during operation. They
+ * <p>There can be lingering state handles when failures happen during operation. They
  * need to be cleaned up manually (see <a href="https://issues.apache.org/jira/browse/FLINK-2513">
  * FLINK-2513</a> about a possible way to overcome this).
  *
@@ -84,13 +80,11 @@
 
 	private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperStateHandleStore.class);
 
-	/** Curator ZooKeeper client */
+	/** Curator ZooKeeper client. */
 	private final CuratorFramework client;
 
 	private final RetrievableStateStorageHelper<T> storage;
 
-	private final Executor executor;
-
 	/** Lock node name of this ZooKeeperStateHandleStore. The name should be unique among all other state handle stores. */
 	private final String lockNode;
 
@@ -103,16 +97,13 @@
 	 *                            instance, e.g. <code>client.usingNamespace("/stateHandles")</code>
 	 * @param storage to persist the actual state and whose returned state handle is then written
 	 *                to ZooKeeper
-	 * @param executor to run the ZooKeeper callbacks
 	 */
 	public ZooKeeperStateHandleStore(
 		CuratorFramework client,
-		RetrievableStateStorageHelper<T> storage,
-		Executor executor) {
+		RetrievableStateStorageHelper<T> storage) {
 
 		this.client = checkNotNull(client, "Curator client");
 		this.storage = checkNotNull(storage, "State storage");
-		this.executor = checkNotNull(executor);
 
 		// Generate a unique lock node name
 		lockNode = UUID.randomUUID().toString();
@@ -262,7 +253,7 @@ public int exists(String pathInZooKeeper) throws Exception {
 	public Collection<String> getAllPaths() throws Exception {
 		final String path = "/";
 
-		while(true) {
+		while (true) {
 			Stat stat = client.checkExists().forPath(path);
 
 			if (stat == null) {
@@ -393,33 +384,14 @@ public int exists(String pathInZooKeeper) throws Exception {
 
 	/**
 	 * Releases the lock for the given state node and tries to remove the state node if it is no longer locked.
-	 * The deletion of the state node is executed asynchronously.
-	 *
-	 * <p><strong>Important</strong>: This also discards the stored state handle after the given action
-	 * has been executed.
-	 *
-	 * @param pathInZooKeeper Path of state handle to remove (expected to start with a '/')
-	 * @throws Exception If the ZooKeeper operation fails
-	 */
-	public void releaseAndTryRemove(String pathInZooKeeper) throws Exception {
-		releaseAndTryRemove(pathInZooKeeper, null);
-	}
-
-	/**
-	 * Releases the lock for the given state node and tries to remove the state node if it is no longer locked.
-	 * The deletion of the state node is executed asynchronously. After the state node has been deleted, the given
-	 * callback is called with the {@link RetrievableStateHandle} of the deleted state node.
-	 *
-	 * <p><strong>Important</strong>: This also discards the stored state handle after the given action
-	 * has been executed.
+	 * It returns the {@link RetrievableStateHandle} stored under the given state node if any.
 	 *
 	 * @param pathInZooKeeper Path of state handle to remove
-	 * @param callback The callback to execute after a successful deletion. Null if no action needs to be executed.
-	 * @throws Exception If the ZooKeeper operation fails
+	 * @return True if the state handle could be released
+	 * @throws Exception If the ZooKeeper operation or discarding the state handle fails
 	 */
-	public void releaseAndTryRemove(
-			String pathInZooKeeper,
-			@Nullable final RemoveCallback<T> callback) throws Exception {
+	@Nullable
+	public boolean releaseAndTryRemove(String pathInZooKeeper) throws Exception {
 		checkNotNull(pathInZooKeeper, "Path in ZooKeeper");
 
 		final String path = normalizePath(pathInZooKeeper);
@@ -429,14 +401,23 @@ public void releaseAndTryRemove(
 		try {
 			stateHandle = get(path, false);
 		} catch (Exception e) {
-			LOG.warn("Could not retrieve the state handle from node " + path + '.', e);
+			LOG.warn("Could not retrieve the state handle from node {}.", path, e);
 		}
 
 		release(pathInZooKeeper);
 
-		final BackgroundCallback backgroundCallback = new RemoveBackgroundCallback<>(stateHandle, callback, path);
+		try {
+			client.delete().forPath(path);
+		} catch (KeeperException.NotEmptyException ignored) {
+			LOG.debug("Could not delete znode {} because it is still locked.", path);
+			return false;
+		}
+
+		if (stateHandle != null) {
+			stateHandle.discardState();
+		}
 
-		client.delete().inBackground(backgroundCallback, executor).forPath(path);
+		return true;
 	}
 
 	/**
@@ -583,7 +564,7 @@ protected String getLockPath(String rootPath) {
 	}
 
 	/**
-	 * Makes sure that every path starts with a "/"
+	 * Makes sure that every path starts with a "/".
 	 *
 	 * @param path Path to normalize
 	 * @return Normalized path such that it starts with a "/"
@@ -595,103 +576,4 @@ private static String normalizePath(String path) {
 			return '/' + path;
 		}
 	}
-
-	// ---------------------------------------------------------------------------------------------------------
-	// Utility classes
-	// ---------------------------------------------------------------------------------------------------------
-
-	/**
-	 * Callback which is executed when removing a node from ZooKeeper. The callback will call the given
-	 * {@link RemoveCallback} if it is not null. Afterwards, it will discard the given {@link RetrievableStateHandle}
-	 * if it is not null.
-	 *
-	 * @param <T> Type of the value stored in the RetrievableStateHandle
-	 */
-	private static final class RemoveBackgroundCallback<T extends Serializable> implements BackgroundCallback {
-		@Nullable
-		private final RetrievableStateHandle<T> stateHandle;
-
-		@Nullable
-		private final RemoveCallback<T> callback;
-
-		private final String pathInZooKeeper;
-
-		private RemoveBackgroundCallback(
-			@Nullable RetrievableStateHandle<T> stateHandle,
-			@Nullable RemoveCallback<T> callback,
-			String pathInZooKeeper) {
-
-			this.stateHandle = stateHandle;
-			this.callback = callback;
-			this.pathInZooKeeper = Preconditions.checkNotNull(pathInZooKeeper);
-		}
-
-		@Override
-		public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
-			try {
-				if (event.getType() == CuratorEventType.DELETE) {
-					final KeeperException.Code resultCode = KeeperException.Code.get(event.getResultCode());
-
-					if (resultCode == KeeperException.Code.OK) {
-						Exception exception = null;
-
-						if (null != callback) {
-							try {
-								callback.apply(stateHandle);
-							} catch (Throwable e) {
-								exception = new Exception("Could not execute delete action for node " +
-									pathInZooKeeper + '.', e);
-							}
-						}
-
-						if (stateHandle != null) {
-							try {
-								// Discard the state handle
-								stateHandle.discardState();
-							} catch (Throwable e) {
-								Exception newException = new Exception("Could not discard state handle of node " +
-									pathInZooKeeper + '.', e);
-
-								if (exception == null) {
-									exception = newException;
-								} else {
-									exception.addSuppressed(newException);
-								}
-							}
-						}
-
-						if (exception != null) {
-							throw exception;
-						}
-					} else if (resultCode == KeeperException.Code.NOTEMPTY) {
-						// Could not delete the node because it still contains children/locks
-						LOG.debug("Could not delete node " + pathInZooKeeper + " because it is still locked.");
-					} else {
-						throw new IllegalStateException("Unexpected result code " +
-							resultCode.name() + " in '" + event + "' callback.");
-					}
-				} else {
-					throw new IllegalStateException("Unexpected event type " +
-						event.getType() + " in '" + event + "' callback.");
-				}
-			} catch (Exception e) {
-				LOG.warn("Failed to run callback for delete operation on node " + pathInZooKeeper + '.', e);
-			}
-
-		}
-	}
-
-	/**
-	 * Callback interface for remove calls
-	 */
-	public interface RemoveCallback<T extends Serializable> {
-		/**
-		 * Callback method. The parameter can be null if the {@link RetrievableStateHandle} could not be retrieved
-		 * from ZooKeeper.
-		 *
-		 * @param value RetrievableStateHandle retrieved from ZooKeeper, null if it was not retrievable
-		 * @throws FlinkException If the callback failed
-		 */
-		void apply(@Nullable RetrievableStateHandle<T> value) throws FlinkException;
-	}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperUtilityFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperUtilityFactory.java
index d3b7dc5b379..3e294e0dbdd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperUtilityFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperUtilityFactory.java
@@ -18,15 +18,15 @@
 
 package org.apache.flink.runtime.zookeeper;
 
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.recipes.shared.SharedCount;
-import org.apache.curator.framework.recipes.shared.SharedValue;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.util.ZooKeeperUtils;
 import org.apache.flink.util.Preconditions;
 
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.recipes.shared.SharedCount;
+import org.apache.curator.framework.recipes.shared.SharedValue;
+
 import java.io.Serializable;
-import java.util.concurrent.Executor;
 
 /**
  * Creates ZooKeeper utility classes without exposing the {@link CuratorFramework} dependency. The
@@ -71,7 +71,6 @@ public void close(boolean cleanup) throws Exception {
 	 *
 	 * @param zkStateHandleStorePath specifying the path in ZooKeeper to store the state handles to
 	 * @param stateStorageHelper storing the actual state data
-	 * @param executor to run asynchronous callbacks of the state handle store
 	 * @param <T> Type of the state to be stored
 	 * @return a ZooKeeperStateHandleStore instance
 	 * @throws Exception if ZooKeeper could not create the provided state handle store path in
@@ -79,8 +78,7 @@ public void close(boolean cleanup) throws Exception {
 	 */
 	public <T extends Serializable> ZooKeeperStateHandleStore<T> createZooKeeperStateHandleStore(
 			String zkStateHandleStorePath,
-			RetrievableStateStorageHelper<T> stateStorageHelper,
-			Executor executor) throws Exception {
+			RetrievableStateStorageHelper<T> stateStorageHelper) throws Exception {
 
 		facade.newNamespaceAwareEnsurePath(zkStateHandleStorePath).ensure(facade.getZookeeperClient());
 		CuratorFramework stateHandleStoreFacade = facade.usingNamespace(
@@ -88,7 +86,7 @@ public void close(boolean cleanup) throws Exception {
 				facade.getNamespace(),
 				zkStateHandleStorePath));
 
-		return new ZooKeeperStateHandleStore<>(stateHandleStoreFacade, stateStorageHelper, executor);
+		return new ZooKeeperStateHandleStore<>(stateHandleStoreFacade, stateStorageHelper);
 	}
 
 	/**
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java
index 81569649663..c4d89030dc3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java
@@ -193,7 +193,7 @@ public void testDiscardAllCheckpoints() throws Exception {
 
 	// ---------------------------------------------------------------------------------------------
 
-	protected TestCompletedCheckpoint createCheckpoint(
+	public static TestCompletedCheckpoint createCheckpoint(
 		int id,
 		SharedStateRegistry sharedStateRegistry) throws IOException {
 
@@ -226,7 +226,12 @@ protected void verifyCheckpointRegistered(Collection<OperatorState> operatorStat
 		}
 	}
 
-	protected void verifyCheckpointDiscarded(Collection<OperatorState> operatorStates) {
+	public static void verifyCheckpointDiscarded(TestCompletedCheckpoint completedCheckpoint) {
+		assertTrue(completedCheckpoint.isDiscarded());
+		verifyCheckpointDiscarded(completedCheckpoint.getOperatorStates().values());
+	}
+
+	protected static void verifyCheckpointDiscarded(Collection<OperatorState> operatorStates) {
 		for (OperatorState operatorState : operatorStates) {
 			for (OperatorSubtaskState subtaskState : operatorState.getStates()) {
 				Assert.assertTrue(((TestOperatorSubtaskState)subtaskState).discarded);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreMockitoTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreMockitoTest.java
new file mode 100644
index 00000000000..1f7d3691e50
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreMockitoTest.java
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.state.RetrievableStateHandle;
+import org.apache.flink.runtime.state.testutils.TestCompletedCheckpointStorageLocation;
+import org.apache.flink.runtime.zookeeper.RetrievableStateStorageHelper;
+import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore;
+import org.apache.flink.util.TestLogger;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.api.BackgroundCallback;
+import org.apache.curator.framework.api.CuratorEvent;
+import org.apache.curator.framework.api.CuratorEventType;
+import org.apache.curator.framework.api.ErrorListenerPathable;
+import org.apache.curator.utils.EnsurePath;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.concurrent.Executor;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import static org.powermock.api.mockito.PowerMockito.doAnswer;
+import static org.powermock.api.mockito.PowerMockito.doThrow;
+import static org.powermock.api.mockito.PowerMockito.whenNew;
+
+/**
+ * Mockito based tests for the {@link ZooKeeperStateHandleStore}.
+ */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(ZooKeeperCompletedCheckpointStore.class)
+public class ZooKeeperCompletedCheckpointStoreMockitoTest extends TestLogger {
+
+	/**
+	 * Tests that the completed checkpoint store can retrieve all checkpoints stored in ZooKeeper
+	 * and ignores those which cannot be retrieved via their state handles.
+	 *
+	 * <p>We have a timeout in case the ZooKeeper store get's into a deadlock/livelock situation.
+	 */
+	@Test(timeout = 50000)
+	public void testCheckpointRecovery() throws Exception {
+		final JobID jobID = new JobID();
+		final long checkpoint1Id = 1L;
+		final long checkpoint2Id = 2;
+		final List<Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String>> checkpointsInZooKeeper = new ArrayList<>(4);
+
+		final Collection<Long> expectedCheckpointIds = new HashSet<>(2);
+		expectedCheckpointIds.add(1L);
+		expectedCheckpointIds.add(2L);
+
+		final RetrievableStateHandle<CompletedCheckpoint> failingRetrievableStateHandle = mock(RetrievableStateHandle.class);
+		when(failingRetrievableStateHandle.retrieveState()).thenThrow(new IOException("Test exception"));
+
+		final RetrievableStateHandle<CompletedCheckpoint> retrievableStateHandle1 = mock(RetrievableStateHandle.class);
+		when(retrievableStateHandle1.retrieveState()).then(
+			(invocation) -> new CompletedCheckpoint(
+				jobID,
+				checkpoint1Id,
+				1L,
+				1L,
+				new HashMap<>(),
+				null,
+				CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION),
+				new TestCompletedCheckpointStorageLocation()));
+
+		final RetrievableStateHandle<CompletedCheckpoint> retrievableStateHandle2 = mock(RetrievableStateHandle.class);
+		when(retrievableStateHandle2.retrieveState()).then(
+			(invocation -> new CompletedCheckpoint(
+				jobID,
+				checkpoint2Id,
+				2L,
+				2L,
+				new HashMap<>(),
+				null,
+				CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION),
+				new TestCompletedCheckpointStorageLocation())));
+
+		checkpointsInZooKeeper.add(Tuple2.of(retrievableStateHandle1, "/foobar1"));
+		checkpointsInZooKeeper.add(Tuple2.of(failingRetrievableStateHandle, "/failing1"));
+		checkpointsInZooKeeper.add(Tuple2.of(retrievableStateHandle2, "/foobar2"));
+		checkpointsInZooKeeper.add(Tuple2.of(failingRetrievableStateHandle, "/failing2"));
+
+		final CuratorFramework client = mock(CuratorFramework.class, Mockito.RETURNS_DEEP_STUBS);
+		final RetrievableStateStorageHelper<CompletedCheckpoint> storageHelperMock = mock(RetrievableStateStorageHelper.class);
+
+		ZooKeeperStateHandleStore<CompletedCheckpoint> zooKeeperStateHandleStoreMock = spy(new ZooKeeperStateHandleStore<>(client, storageHelperMock));
+		whenNew(ZooKeeperStateHandleStore.class).withAnyArguments().thenReturn(zooKeeperStateHandleStoreMock);
+		doReturn(checkpointsInZooKeeper).when(zooKeeperStateHandleStoreMock).getAllSortedByNameAndLock();
+
+		final int numCheckpointsToRetain = 1;
+
+		// Mocking for the delete operation on the CuratorFramework client
+		// It assures that the callback is executed synchronously
+
+		final EnsurePath ensurePathMock = mock(EnsurePath.class);
+		final CuratorEvent curatorEventMock = mock(CuratorEvent.class);
+		when(curatorEventMock.getType()).thenReturn(CuratorEventType.DELETE);
+		when(curatorEventMock.getResultCode()).thenReturn(0);
+		when(client.newNamespaceAwareEnsurePath(anyString())).thenReturn(ensurePathMock);
+
+		when(
+			client
+				.delete()
+				.inBackground(any(BackgroundCallback.class), any(Executor.class))
+		).thenAnswer(new Answer<ErrorListenerPathable<Void>>() {
+			@Override
+			public ErrorListenerPathable<Void> answer(InvocationOnMock invocation) throws Throwable {
+				final BackgroundCallback callback = (BackgroundCallback) invocation.getArguments()[0];
+
+				ErrorListenerPathable<Void> result = mock(ErrorListenerPathable.class);
+
+				when(result.forPath(anyString())).thenAnswer(new Answer<Void>() {
+					@Override
+					public Void answer(InvocationOnMock invocation) throws Throwable {
+
+						callback.processResult(client, curatorEventMock);
+
+						return null;
+					}
+				});
+
+				return result;
+			}
+		});
+
+		final String checkpointsPath = "foobar";
+		final RetrievableStateStorageHelper<CompletedCheckpoint> stateStorage = mock(RetrievableStateStorageHelper.class);
+
+		ZooKeeperCompletedCheckpointStore zooKeeperCompletedCheckpointStore = new ZooKeeperCompletedCheckpointStore(
+			numCheckpointsToRetain,
+			client,
+			checkpointsPath,
+			stateStorage,
+			Executors.directExecutor());
+
+		zooKeeperCompletedCheckpointStore.recover();
+
+		CompletedCheckpoint latestCompletedCheckpoint = zooKeeperCompletedCheckpointStore.getLatestCheckpoint();
+
+		// check that we return the latest retrievable checkpoint
+		// this should remove the latest checkpoint because it is broken
+		assertEquals(checkpoint2Id, latestCompletedCheckpoint.getCheckpointID());
+
+		// this should remove the second broken checkpoint because we're iterating over all checkpoints
+		List<CompletedCheckpoint> completedCheckpoints = zooKeeperCompletedCheckpointStore.getAllCheckpoints();
+
+		Collection<Long> actualCheckpointIds = new HashSet<>(completedCheckpoints.size());
+
+		for (CompletedCheckpoint completedCheckpoint : completedCheckpoints) {
+			actualCheckpointIds.add(completedCheckpoint.getCheckpointID());
+		}
+
+		assertEquals(expectedCheckpointIds, actualCheckpointIds);
+
+		// check that we did not discard any of the state handles
+		verify(retrievableStateHandle1, never()).discardState();
+		verify(retrievableStateHandle2, never()).discardState();
+
+		// Make sure that we also didn't discard any of the broken handles. Only when checkpoints
+		// are subsumed should they be discarded.
+		verify(failingRetrievableStateHandle, never()).discardState();
+	}
+
+	/**
+	 * Tests that the checkpoint does not exist in the store when we fail to add
+	 * it into the store (i.e., there exists an exception thrown by the method).
+	 */
+	@Test
+	public void testAddCheckpointWithFailedRemove() throws Exception {
+		final CuratorFramework client = mock(CuratorFramework.class, Mockito.RETURNS_DEEP_STUBS);
+		final RetrievableStateStorageHelper<CompletedCheckpoint> storageHelperMock = mock(RetrievableStateStorageHelper.class);
+
+		ZooKeeperStateHandleStore<CompletedCheckpoint> zookeeperStateHandleStoreMock =
+			spy(new ZooKeeperStateHandleStore<>(client, storageHelperMock));
+		whenNew(ZooKeeperStateHandleStore.class).withAnyArguments().thenReturn(zookeeperStateHandleStoreMock);
+
+		doAnswer(new Answer<RetrievableStateHandle<CompletedCheckpoint>>() {
+			@Override
+			public RetrievableStateHandle<CompletedCheckpoint> answer(InvocationOnMock invocationOnMock) throws Throwable {
+				CompletedCheckpoint checkpoint = (CompletedCheckpoint) invocationOnMock.getArguments()[1];
+
+				RetrievableStateHandle<CompletedCheckpoint> retrievableStateHandle = mock(RetrievableStateHandle.class);
+				when(retrievableStateHandle.retrieveState()).thenReturn(checkpoint);
+
+				return retrievableStateHandle;
+			}
+		}).when(zookeeperStateHandleStoreMock).addAndLock(anyString(), any(CompletedCheckpoint.class));
+
+		doThrow(new Exception()).when(zookeeperStateHandleStoreMock).releaseAndTryRemove(anyString());
+
+		final int numCheckpointsToRetain = 1;
+		final String checkpointsPath = "foobar";
+		final RetrievableStateStorageHelper<CompletedCheckpoint> stateSotrage = mock(RetrievableStateStorageHelper.class);
+
+		ZooKeeperCompletedCheckpointStore zooKeeperCompletedCheckpointStore = new ZooKeeperCompletedCheckpointStore(
+			numCheckpointsToRetain,
+			client,
+			checkpointsPath,
+			stateSotrage,
+			Executors.directExecutor());
+
+		for (long i = 0; i <= numCheckpointsToRetain; ++i) {
+			CompletedCheckpoint checkpointToAdd = mock(CompletedCheckpoint.class);
+			doReturn(i).when(checkpointToAdd).getCheckpointID();
+			doReturn(Collections.emptyMap()).when(checkpointToAdd).getOperatorStates();
+
+			try {
+				zooKeeperCompletedCheckpointStore.addCheckpoint(checkpointToAdd);
+
+				// The checkpoint should be in the store if we successfully add it into the store.
+				List<CompletedCheckpoint> addedCheckpoints = zooKeeperCompletedCheckpointStore.getAllCheckpoints();
+				assertTrue(addedCheckpoints.contains(checkpointToAdd));
+			} catch (Exception e) {
+				// The checkpoint should not be in the store if any exception is thrown.
+				List<CompletedCheckpoint> addedCheckpoints = zooKeeperCompletedCheckpointStore.getAllCheckpoints();
+				assertFalse(addedCheckpoints.contains(checkpointToAdd));
+			}
+		}
+	}
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java
index 0384733fdb1..f992d3b00c0 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java
@@ -18,60 +18,39 @@
 
 package org.apache.flink.runtime.checkpoint;
 
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.state.RetrievableStateHandle;
-import org.apache.flink.runtime.state.testutils.TestCompletedCheckpointStorageLocation;
+import org.apache.flink.runtime.state.SharedStateRegistry;
+import org.apache.flink.runtime.util.ZooKeeperUtils;
 import org.apache.flink.runtime.zookeeper.RetrievableStateStorageHelper;
-import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore;
+import org.apache.flink.runtime.zookeeper.ZooKeeperResource;
 import org.apache.flink.util.TestLogger;
 
 import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.api.BackgroundCallback;
-import org.apache.curator.framework.api.CuratorEvent;
-import org.apache.curator.framework.api.CuratorEventType;
-import org.apache.curator.framework.api.ErrorListenerPathable;
-import org.apache.curator.utils.EnsurePath;
+import org.hamcrest.Matchers;
+import org.junit.ClassRule;
 import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.mockito.Mockito;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
+
+import javax.annotation.Nonnull;
 
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
+import java.io.Serializable;
 import java.util.List;
-import java.util.concurrent.Executor;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyString;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-import static org.powermock.api.mockito.PowerMockito.doAnswer;
-import static org.powermock.api.mockito.PowerMockito.doThrow;
-import static org.powermock.api.mockito.PowerMockito.whenNew;
+import static org.junit.Assert.assertThat;
 
 /**
  * Tests for {@link ZooKeeperCompletedCheckpointStore}.
  */
-@RunWith(PowerMockRunner.class)
-@PrepareForTest(ZooKeeperCompletedCheckpointStore.class)
 public class ZooKeeperCompletedCheckpointStoreTest extends TestLogger {
 
+	@ClassRule
+	public static ZooKeeperResource zooKeeperResource = new ZooKeeperResource();
+
 	@Test
 	public void testPathConversion() {
 		final long checkpointId = 42L;
@@ -82,188 +61,103 @@ public void testPathConversion() {
 	}
 
 	/**
-	 * Tests that the completed checkpoint store can retrieve all checkpoints stored in ZooKeeper
-	 * and ignores those which cannot be retrieved via their state handles.
-	 *
-	 * <p>We have a timeout in case the ZooKeeper store get's into a deadlock/livelock situation.
+	 * Tests that subsumed checkpoints are discarded.
 	 */
-	@Test(timeout = 50000)
-	public void testCheckpointRecovery() throws Exception {
-		final JobID jobID = new JobID();
-		final long checkpoint1Id = 1L;
-		final long checkpoint2Id = 2;
-		final List<Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String>> checkpointsInZooKeeper = new ArrayList<>(4);
-
-		final Collection<Long> expectedCheckpointIds = new HashSet<>(2);
-		expectedCheckpointIds.add(1L);
-		expectedCheckpointIds.add(2L);
-
-		final RetrievableStateHandle<CompletedCheckpoint> failingRetrievableStateHandle = mock(RetrievableStateHandle.class);
-		when(failingRetrievableStateHandle.retrieveState()).thenThrow(new IOException("Test exception"));
-
-		final RetrievableStateHandle<CompletedCheckpoint> retrievableStateHandle1 = mock(RetrievableStateHandle.class);
-		when(retrievableStateHandle1.retrieveState()).then(
-			(invocation) -> new CompletedCheckpoint(
-				jobID,
-				checkpoint1Id,
-				1L,
-				1L,
-				new HashMap<>(),
-				null,
-				CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION),
-				new TestCompletedCheckpointStorageLocation()));
-
-		final RetrievableStateHandle<CompletedCheckpoint> retrievableStateHandle2 = mock(RetrievableStateHandle.class);
-		when(retrievableStateHandle2.retrieveState()).then(
-			(invocation -> new CompletedCheckpoint(
-				jobID,
-				checkpoint2Id,
-				2L,
-				2L,
-				new HashMap<>(),
-				null,
-				CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION),
-				new TestCompletedCheckpointStorageLocation())));
-
-		checkpointsInZooKeeper.add(Tuple2.of(retrievableStateHandle1, "/foobar1"));
-		checkpointsInZooKeeper.add(Tuple2.of(failingRetrievableStateHandle, "/failing1"));
-		checkpointsInZooKeeper.add(Tuple2.of(retrievableStateHandle2, "/foobar2"));
-		checkpointsInZooKeeper.add(Tuple2.of(failingRetrievableStateHandle, "/failing2"));
-
-		final CuratorFramework client = mock(CuratorFramework.class, Mockito.RETURNS_DEEP_STUBS);
-		final RetrievableStateStorageHelper<CompletedCheckpoint> storageHelperMock = mock(RetrievableStateStorageHelper.class);
-
-		ZooKeeperStateHandleStore<CompletedCheckpoint> zooKeeperStateHandleStoreMock = spy(new ZooKeeperStateHandleStore<>(client, storageHelperMock, Executors.directExecutor()));
-		whenNew(ZooKeeperStateHandleStore.class).withAnyArguments().thenReturn(zooKeeperStateHandleStoreMock);
-		doReturn(checkpointsInZooKeeper).when(zooKeeperStateHandleStoreMock).getAllSortedByNameAndLock();
-
-		final int numCheckpointsToRetain = 1;
-
-		// Mocking for the delete operation on the CuratorFramework client
-		// It assures that the callback is executed synchronously
-
-		final EnsurePath ensurePathMock = mock(EnsurePath.class);
-		final CuratorEvent curatorEventMock = mock(CuratorEvent.class);
-		when(curatorEventMock.getType()).thenReturn(CuratorEventType.DELETE);
-		when(curatorEventMock.getResultCode()).thenReturn(0);
-		when(client.newNamespaceAwareEnsurePath(anyString())).thenReturn(ensurePathMock);
-
-		when(
-			client
-				.delete()
-				.inBackground(any(BackgroundCallback.class), any(Executor.class))
-		).thenAnswer(new Answer<ErrorListenerPathable<Void>>() {
-			@Override
-			public ErrorListenerPathable<Void> answer(InvocationOnMock invocation) throws Throwable {
-				final BackgroundCallback callback = (BackgroundCallback) invocation.getArguments()[0];
+	@Test
+	public void testDiscardingSubsumedCheckpoints() throws Exception {
+		final SharedStateRegistry sharedStateRegistry = new SharedStateRegistry();
+		final Configuration configuration = new Configuration();
+		configuration.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, zooKeeperResource.getConnectString());
+
+		final CuratorFramework client = ZooKeeperUtils.startCuratorFramework(configuration);
+		final ZooKeeperCompletedCheckpointStore checkpointStore = createZooKeeperCheckpointStore(client);
+
+		try {
+			final CompletedCheckpointStoreTest.TestCompletedCheckpoint checkpoint1 = CompletedCheckpointStoreTest.createCheckpoint(0, sharedStateRegistry);
+
+			checkpointStore.addCheckpoint(checkpoint1);
+			assertThat(checkpointStore.getAllCheckpoints(), Matchers.contains(checkpoint1));
+
+			final CompletedCheckpointStoreTest.TestCompletedCheckpoint checkpoint2 = CompletedCheckpointStoreTest.createCheckpoint(1, sharedStateRegistry);
+			checkpointStore.addCheckpoint(checkpoint2);
+			final List<CompletedCheckpoint> allCheckpoints = checkpointStore.getAllCheckpoints();
+			assertThat(allCheckpoints, Matchers.contains(checkpoint2));
+			assertThat(allCheckpoints, Matchers.not(Matchers.contains(checkpoint1)));
+
+			// verify that the subsumed checkpoint is discarded
+			CompletedCheckpointStoreTest.verifyCheckpointDiscarded(checkpoint1);
+		} finally {
+			client.close();
+		}
+	}
 
-				ErrorListenerPathable<Void> result = mock(ErrorListenerPathable.class);
+	/**
+	 * Tests that checkpoints are discarded when the completed checkpoint store is shut
+	 * down with a globally terminal state.
+	 */
+	@Test
+	public void testDiscardingCheckpointsAtShutDown() throws Exception {
+		final SharedStateRegistry sharedStateRegistry = new SharedStateRegistry();
+		final Configuration configuration = new Configuration();
+		configuration.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, zooKeeperResource.getConnectString());
 
-				when(result.forPath(anyString())).thenAnswer(new Answer<Void>() {
-					@Override
-					public Void answer(InvocationOnMock invocation) throws Throwable {
+		final CuratorFramework client = ZooKeeperUtils.startCuratorFramework(configuration);
+		final ZooKeeperCompletedCheckpointStore checkpointStore = createZooKeeperCheckpointStore(client);
 
-						callback.processResult(client, curatorEventMock);
+		try {
+			final CompletedCheckpointStoreTest.TestCompletedCheckpoint checkpoint1 = CompletedCheckpointStoreTest.createCheckpoint(0, sharedStateRegistry);
 
-						return null;
-					}
-				});
+			checkpointStore.addCheckpoint(checkpoint1);
+			assertThat(checkpointStore.getAllCheckpoints(), Matchers.contains(checkpoint1));
 
-				return result;
-			}
-		});
+			checkpointStore.shutdown(JobStatus.FINISHED);
 
-		final String checkpointsPath = "foobar";
-		final RetrievableStateStorageHelper<CompletedCheckpoint> stateStorage = mock(RetrievableStateStorageHelper.class);
+			// verify that the checkpoint is discarded
+			CompletedCheckpointStoreTest.verifyCheckpointDiscarded(checkpoint1);
+		} finally {
+			client.close();
+		}
+	}
 
-		ZooKeeperCompletedCheckpointStore zooKeeperCompletedCheckpointStore = new ZooKeeperCompletedCheckpointStore(
-			numCheckpointsToRetain,
+	@Nonnull
+	private ZooKeeperCompletedCheckpointStore createZooKeeperCheckpointStore(CuratorFramework client) throws Exception {
+		return new ZooKeeperCompletedCheckpointStore(
+			1,
 			client,
-			checkpointsPath,
-			stateStorage,
+			"/checkpoints",
+			new TestingRetrievableStateStorageHelper<>(),
 			Executors.directExecutor());
+	}
 
-		zooKeeperCompletedCheckpointStore.recover();
-
-		CompletedCheckpoint latestCompletedCheckpoint = zooKeeperCompletedCheckpointStore.getLatestCheckpoint();
-
-		// check that we return the latest retrievable checkpoint
-		// this should remove the latest checkpoint because it is broken
-		assertEquals(checkpoint2Id, latestCompletedCheckpoint.getCheckpointID());
-
-		// this should remove the second broken checkpoint because we're iterating over all checkpoints
-		List<CompletedCheckpoint> completedCheckpoints = zooKeeperCompletedCheckpointStore.getAllCheckpoints();
-
-		Collection<Long> actualCheckpointIds = new HashSet<>(completedCheckpoints.size());
-
-		for (CompletedCheckpoint completedCheckpoint : completedCheckpoints) {
-			actualCheckpointIds.add(completedCheckpoint.getCheckpointID());
+	private static final class TestingRetrievableStateStorageHelper<T extends Serializable> implements RetrievableStateStorageHelper<T> {
+		@Override
+		public RetrievableStateHandle<T> store(T state) {
+			return new TestingRetrievableStateHandle<>(state);
 		}
 
-		assertEquals(expectedCheckpointIds, actualCheckpointIds);
-
-		// check that we did not discard any of the state handles
-		verify(retrievableStateHandle1, never()).discardState();
-		verify(retrievableStateHandle2, never()).discardState();
+		private static class TestingRetrievableStateHandle<T extends Serializable> implements RetrievableStateHandle<T> {
 
-		// Make sure that we also didn't discard any of the broken handles. Only when checkpoints
-		// are subsumed should they be discarded.
-		verify(failingRetrievableStateHandle, never()).discardState();
-	}
+			private static final long serialVersionUID = 137053380713794300L;
 
-	/**
-	 * Tests that the checkpoint does not exist in the store when we fail to add
-	 * it into the store (i.e., there exists an exception thrown by the method).
-	 */
-	@Test
-	public void testAddCheckpointWithFailedRemove() throws Exception {
-		final CuratorFramework client = mock(CuratorFramework.class, Mockito.RETURNS_DEEP_STUBS);
-		final RetrievableStateStorageHelper<CompletedCheckpoint> storageHelperMock = mock(RetrievableStateStorageHelper.class);
+			private final T state;
 
-		ZooKeeperStateHandleStore<CompletedCheckpoint> zookeeperStateHandleStoreMock =
-			spy(new ZooKeeperStateHandleStore<>(client, storageHelperMock, Executors.directExecutor()));
-		whenNew(ZooKeeperStateHandleStore.class).withAnyArguments().thenReturn(zookeeperStateHandleStoreMock);
+			private TestingRetrievableStateHandle(T state) {
+				this.state = state;
+			}
 
-		doAnswer(new Answer<RetrievableStateHandle<CompletedCheckpoint>>() {
 			@Override
-			public RetrievableStateHandle<CompletedCheckpoint> answer(InvocationOnMock invocationOnMock) throws Throwable {
-				CompletedCheckpoint checkpoint = (CompletedCheckpoint) invocationOnMock.getArguments()[1];
-
-				RetrievableStateHandle<CompletedCheckpoint> retrievableStateHandle = mock(RetrievableStateHandle.class);
-				when(retrievableStateHandle.retrieveState()).thenReturn(checkpoint);
-
-				return retrievableStateHandle;
+			public T retrieveState() throws IOException, ClassNotFoundException {
+				return state;
 			}
-		}).when(zookeeperStateHandleStoreMock).addAndLock(anyString(), any(CompletedCheckpoint.class));
-
-		doThrow(new Exception()).when(zookeeperStateHandleStoreMock).releaseAndTryRemove(anyString(), any(ZooKeeperStateHandleStore.RemoveCallback.class));
 
-		final int numCheckpointsToRetain = 1;
-		final String checkpointsPath = "foobar";
-		final RetrievableStateStorageHelper<CompletedCheckpoint> stateSotrage = mock(RetrievableStateStorageHelper.class);
-
-		ZooKeeperCompletedCheckpointStore zooKeeperCompletedCheckpointStore = new ZooKeeperCompletedCheckpointStore(
-			numCheckpointsToRetain,
-			client,
-			checkpointsPath,
-			stateSotrage,
-			Executors.directExecutor());
+			@Override
+			public void discardState() throws Exception {
+				// no op
+			}
 
-		for (long i = 0; i <= numCheckpointsToRetain; ++i) {
-			CompletedCheckpoint checkpointToAdd = mock(CompletedCheckpoint.class);
-			doReturn(i).when(checkpointToAdd).getCheckpointID();
-			doReturn(Collections.emptyMap()).when(checkpointToAdd).getOperatorStates();
-
-			try {
-				zooKeeperCompletedCheckpointStore.addCheckpoint(checkpointToAdd);
-
-				// The checkpoint should be in the store if we successfully add it into the store.
-				List<CompletedCheckpoint> addedCheckpoints = zooKeeperCompletedCheckpointStore.getAllCheckpoints();
-				assertTrue(addedCheckpoints.contains(checkpointToAdd));
-			} catch (Exception e) {
-				// The checkpoint should not be in the store if any exception is thrown.
-				List<CompletedCheckpoint> addedCheckpoints = zooKeeperCompletedCheckpointStore.getAllCheckpoints();
-				assertFalse(addedCheckpoints.contains(checkpointToAdd));
+			@Override
+			public long getStateSize() {
+				return 0;
 			}
 		}
 	}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphsStoreITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphsStoreITCase.java
index c1a7b536721..e9be145c37f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphsStoreITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphsStoreITCase.java
@@ -20,7 +20,6 @@
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.akka.ListeningBehaviour;
-import org.apache.flink.runtime.concurrent.Executors;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore.SubmittedJobGraphListener;
@@ -90,8 +89,7 @@ public void testPutAndRemoveJobGraph() throws Exception {
 		ZooKeeperSubmittedJobGraphStore jobGraphs = new ZooKeeperSubmittedJobGraphStore(
 			ZooKeeper.createClient(),
 			"/testPutAndRemoveJobGraph",
-			localStateStorage,
-			Executors.directExecutor());
+			localStateStorage);
 
 		try {
 			SubmittedJobGraphListener listener = mock(SubmittedJobGraphListener.class);
@@ -147,7 +145,7 @@ public void testPutAndRemoveJobGraph() throws Exception {
 	@Test
 	public void testRecoverJobGraphs() throws Exception {
 		ZooKeeperSubmittedJobGraphStore jobGraphs = new ZooKeeperSubmittedJobGraphStore(
-				ZooKeeper.createClient(), "/testRecoverJobGraphs", localStateStorage, Executors.directExecutor());
+				ZooKeeper.createClient(), "/testRecoverJobGraphs", localStateStorage);
 
 		try {
 			SubmittedJobGraphListener listener = mock(SubmittedJobGraphListener.class);
@@ -198,10 +196,10 @@ public void testConcurrentAddJobGraph() throws Exception {
 
 		try {
 			jobGraphs = new ZooKeeperSubmittedJobGraphStore(
-					ZooKeeper.createClient(), "/testConcurrentAddJobGraph", localStateStorage, Executors.directExecutor());
+					ZooKeeper.createClient(), "/testConcurrentAddJobGraph", localStateStorage);
 
 			otherJobGraphs = new ZooKeeperSubmittedJobGraphStore(
-					ZooKeeper.createClient(), "/testConcurrentAddJobGraph", localStateStorage, Executors.directExecutor());
+					ZooKeeper.createClient(), "/testConcurrentAddJobGraph", localStateStorage);
 
 
 			SubmittedJobGraph jobGraph = createSubmittedJobGraph(new JobID(), 0);
@@ -257,10 +255,10 @@ public Void answer(InvocationOnMock invocation) throws Throwable {
 	@Test(expected = IllegalStateException.class)
 	public void testUpdateJobGraphYouDidNotGetOrAdd() throws Exception {
 		ZooKeeperSubmittedJobGraphStore jobGraphs = new ZooKeeperSubmittedJobGraphStore(
-				ZooKeeper.createClient(), "/testUpdateJobGraphYouDidNotGetOrAdd", localStateStorage, Executors.directExecutor());
+				ZooKeeper.createClient(), "/testUpdateJobGraphYouDidNotGetOrAdd", localStateStorage);
 
 		ZooKeeperSubmittedJobGraphStore otherJobGraphs = new ZooKeeperSubmittedJobGraphStore(
-				ZooKeeper.createClient(), "/testUpdateJobGraphYouDidNotGetOrAdd", localStateStorage, Executors.directExecutor());
+				ZooKeeper.createClient(), "/testUpdateJobGraphYouDidNotGetOrAdd", localStateStorage);
 
 		jobGraphs.start(null);
 		otherJobGraphs.start(null);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperResource.java b/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperResource.java
new file mode 100644
index 00000000000..c4c56949cd9
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperResource.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.zookeeper;
+
+import org.apache.flink.util.Preconditions;
+
+import org.apache.curator.test.TestingServer;
+import org.junit.rules.ExternalResource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+
+/**
+ * {@link ExternalResource} which starts a {@link org.apache.zookeeper.server.ZooKeeperServer}.
+ */
+public class ZooKeeperResource extends ExternalResource {
+
+	private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperResource.class);
+
+	@Nullable
+	private TestingServer zooKeeperServer;
+
+	public String getConnectString() {
+		verifyIsRunning();
+		return zooKeeperServer.getConnectString();
+	}
+
+	private void verifyIsRunning() {
+		Preconditions.checkState(zooKeeperServer != null);
+	}
+
+	@Override
+	protected void before() throws Throwable {
+		terminateZooKeeperServer();
+		zooKeeperServer = new TestingServer(true);
+	}
+
+	private void terminateZooKeeperServer() throws IOException {
+		if (zooKeeperServer != null) {
+			zooKeeperServer.stop();
+			zooKeeperServer = null;
+		}
+	}
+
+	@Override
+	protected void after() {
+		try {
+			terminateZooKeeperServer();
+		} catch (IOException e) {
+			LOG.warn("Could not properly terminate the {}.", getClass().getSimpleName(), e);
+		}
+	}
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreTest.java
index fd39b25991c..2dd27e7c897 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreTest.java
@@ -18,21 +18,19 @@
 
 package org.apache.flink.runtime.zookeeper;
 
-import org.apache.curator.framework.CuratorFramework;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.HighAvailabilityOptions;
-import org.apache.flink.runtime.concurrent.Executors;
 import org.apache.flink.runtime.state.RetrievableStateHandle;
 import org.apache.flink.runtime.util.ZooKeeperUtils;
 import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.TestLogger;
+
+import org.apache.curator.framework.CuratorFramework;
 import org.apache.zookeeper.data.Stat;
 import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.Test;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -41,7 +39,6 @@
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
-import java.util.concurrent.CountDownLatch;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotEquals;
@@ -49,12 +46,7 @@
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 /**
@@ -88,8 +80,8 @@ public void cleanUp() throws Exception {
 	@Test
 	public void testAddAndLock() throws Exception {
 		LongStateStorage longStateStorage = new LongStateStorage();
-		ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<Long>(
-				ZOOKEEPER.getClient(), longStateStorage, Executors.directExecutor());
+		ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
+			ZOOKEEPER.getClient(), longStateStorage);
 
 		// Config
 		final String pathInZooKeeper = "/testAdd";
@@ -136,7 +128,7 @@ public void testAddAlreadyExistingPath() throws Exception {
 		LongStateStorage stateHandleProvider = new LongStateStorage();
 
 		ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
-				ZOOKEEPER.getClient(), stateHandleProvider, Executors.directExecutor());
+				ZOOKEEPER.getClient(), stateHandleProvider);
 
 		ZOOKEEPER.getClient().create().forPath("/testAddAlreadyExistingPath");
 
@@ -161,7 +153,7 @@ public void testAddDiscardStateHandleAfterFailure() throws Exception {
 		when(client.inTransaction().create()).thenThrow(new RuntimeException("Expected test Exception."));
 
 		ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
-				client, stateHandleProvider, Executors.directExecutor());
+				client, stateHandleProvider);
 
 		// Config
 		final String pathInZooKeeper = "/testAddDiscardStateHandleAfterFailure";
@@ -191,7 +183,7 @@ public void testReplace() throws Exception {
 		LongStateStorage stateHandleProvider = new LongStateStorage();
 
 		ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
-				ZOOKEEPER.getClient(), stateHandleProvider, Executors.directExecutor());
+				ZOOKEEPER.getClient(), stateHandleProvider);
 
 		// Config
 		final String pathInZooKeeper = "/testReplace";
@@ -230,7 +222,7 @@ public void testReplaceNonExistingPath() throws Exception {
 		RetrievableStateStorageHelper<Long> stateStorage = new LongStateStorage();
 
 		ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
-				ZOOKEEPER.getClient(), stateStorage, Executors.directExecutor());
+				ZOOKEEPER.getClient(), stateStorage);
 
 		store.replace("/testReplaceNonExistingPath", 0, 1L);
 	}
@@ -247,7 +239,7 @@ public void testReplaceDiscardStateHandleAfterFailure() throws Exception {
 		when(client.setData()).thenThrow(new RuntimeException("Expected test Exception."));
 
 		ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
-				client, stateHandleProvider, Executors.directExecutor());
+				client, stateHandleProvider);
 
 		// Config
 		final String pathInZooKeeper = "/testReplaceDiscardStateHandleAfterFailure";
@@ -289,7 +281,7 @@ public void testGetAndExists() throws Exception {
 		LongStateStorage stateHandleProvider = new LongStateStorage();
 
 		ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
-				ZOOKEEPER.getClient(), stateHandleProvider, Executors.directExecutor());
+				ZOOKEEPER.getClient(), stateHandleProvider);
 
 		// Config
 		final String pathInZooKeeper = "/testGetAndExists";
@@ -314,7 +306,7 @@ public void testGetNonExistingPath() throws Exception {
 		LongStateStorage stateHandleProvider = new LongStateStorage();
 
 		ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
-				ZOOKEEPER.getClient(), stateHandleProvider, Executors.directExecutor());
+				ZOOKEEPER.getClient(), stateHandleProvider);
 
 		store.getAndLock("/testGetNonExistingPath");
 	}
@@ -328,7 +320,7 @@ public void testGetAll() throws Exception {
 		LongStateStorage stateHandleProvider = new LongStateStorage();
 
 		ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
-				ZOOKEEPER.getClient(), stateHandleProvider, Executors.directExecutor());
+				ZOOKEEPER.getClient(), stateHandleProvider);
 
 		// Config
 		final String pathInZooKeeper = "/testGetAll";
@@ -359,7 +351,7 @@ public void testGetAllSortedByName() throws Exception {
 		LongStateStorage stateHandleProvider = new LongStateStorage();
 
 		ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
-				ZOOKEEPER.getClient(), stateHandleProvider, Executors.directExecutor());
+				ZOOKEEPER.getClient(), stateHandleProvider);
 
 		// Config
 		final String basePath = "/testGetAllSortedByName";
@@ -393,7 +385,7 @@ public void testRemove() throws Exception {
 		LongStateStorage stateHandleProvider = new LongStateStorage();
 
 		ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
-				ZOOKEEPER.getClient(), stateHandleProvider, Executors.directExecutor());
+				ZOOKEEPER.getClient(), stateHandleProvider);
 
 		// Config
 		final String pathInZooKeeper = "/testRemove";
@@ -401,50 +393,14 @@ public void testRemove() throws Exception {
 
 		store.addAndLock(pathInZooKeeper, state);
 
+		final int numberOfGlobalDiscardCalls = LongRetrievableStateHandle.getNumberOfGlobalDiscardCalls();
+
 		// Test
 		store.releaseAndTryRemove(pathInZooKeeper);
 
 		// Verify discarded
 		assertEquals(0, ZOOKEEPER.getClient().getChildren().forPath("/").size());
-	}
-
-	/**
-	 * Tests that state handles are correctly removed with a callback.
-	 */
-	@Test
-	public void testRemoveWithCallback() throws Exception {
-		// Setup
-		LongStateStorage stateHandleProvider = new LongStateStorage();
-
-		ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
-				ZOOKEEPER.getClient(), stateHandleProvider, Executors.directExecutor());
-
-		// Config
-		final String pathInZooKeeper = "/testRemoveWithCallback";
-		final Long state = 27255442L;
-
-		store.addAndLock(pathInZooKeeper, state);
-
-		final CountDownLatch sync = new CountDownLatch(1);
-		ZooKeeperStateHandleStore.RemoveCallback<Long> callback = mock(ZooKeeperStateHandleStore.RemoveCallback.class);
-		doAnswer(new Answer<Void>() {
-			@Override
-			public Void answer(InvocationOnMock invocation) throws Throwable {
-				sync.countDown();
-				return null;
-			}
-		}).when(callback).apply(any(RetrievableStateHandle.class));
-
-		// Test
-		store.releaseAndTryRemove(pathInZooKeeper, callback);
-
-		// Verify discarded and callback called
-		assertEquals(0, ZOOKEEPER.getClient().getChildren().forPath("/").size());
-
-		sync.await();
-
-		verify(callback, times(1))
-				.apply(any(RetrievableStateHandle.class));
+		assertEquals(numberOfGlobalDiscardCalls + 1, LongRetrievableStateHandle.getNumberOfGlobalDiscardCalls());
 	}
 
 	/** Tests that all state handles are correctly discarded. */
@@ -454,7 +410,7 @@ public void testReleaseAndTryRemoveAll() throws Exception {
 		LongStateStorage stateHandleProvider = new LongStateStorage();
 
 		ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
-				ZOOKEEPER.getClient(), stateHandleProvider, Executors.directExecutor());
+				ZOOKEEPER.getClient(), stateHandleProvider);
 
 		// Config
 		final String pathInZooKeeper = "/testDiscardAll";
@@ -486,8 +442,7 @@ public void testCorruptedData() throws Exception {
 
 		ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
 			ZOOKEEPER.getClient(),
-			stateStorage,
-			Executors.directExecutor());
+			stateStorage);
 
 		final Collection<Long> input = new HashSet<>();
 		input.add(1L);
@@ -543,13 +498,11 @@ public void testConcurrentDeleteOperation() throws Exception {
 
 		ZooKeeperStateHandleStore<Long> zkStore1 = new ZooKeeperStateHandleStore<>(
 			ZOOKEEPER.getClient(),
-			longStateStorage,
-			Executors.directExecutor());
+			longStateStorage);
 
 		ZooKeeperStateHandleStore<Long> zkStore2 = new ZooKeeperStateHandleStore<>(
 			ZOOKEEPER.getClient(),
-			longStateStorage,
-			Executors.directExecutor());
+			longStateStorage);
 
 		final String statePath = "/state";
 
@@ -586,13 +539,11 @@ public void testLockCleanupWhenGetAndLockFails() throws Exception {
 
 		ZooKeeperStateHandleStore<Long> zkStore1 = new ZooKeeperStateHandleStore<>(
 			ZOOKEEPER.getClient(),
-			longStateStorage,
-			Executors.directExecutor());
+			longStateStorage);
 
 		ZooKeeperStateHandleStore<Long> zkStore2 = new ZooKeeperStateHandleStore<>(
 			ZOOKEEPER.getClient(),
-			longStateStorage,
-			Executors.directExecutor());
+			longStateStorage);
 
 		final String path = "/state";
 
@@ -649,8 +600,7 @@ public void testLockCleanupWhenClientTimesOut() throws Exception {
 
 			ZooKeeperStateHandleStore<Long> zkStore = new ZooKeeperStateHandleStore<>(
 				client,
-				longStateStorage,
-				Executors.directExecutor());
+				longStateStorage);
 
 			final String path = "/state";
 
@@ -682,8 +632,7 @@ public void testRelease() throws Exception {
 
 		ZooKeeperStateHandleStore<Long> zkStore = new ZooKeeperStateHandleStore<>(
 			ZOOKEEPER.getClient(),
-			longStateStorage,
-			Executors.directExecutor());
+			longStateStorage);
 
 		final String path = "/state";
 
@@ -720,8 +669,7 @@ public void testReleaseAll() throws Exception {
 
 		ZooKeeperStateHandleStore<Long> zkStore = new ZooKeeperStateHandleStore<>(
 			ZOOKEEPER.getClient(),
-			longStateStorage,
-			Executors.directExecutor());
+			longStateStorage);
 
 		final Collection<String> paths = Arrays.asList("/state1", "/state2", "/state3");
 
@@ -775,9 +723,11 @@ public void testReleaseAll() throws Exception {
 
 		private static final long serialVersionUID = -3555329254423838912L;
 
+		private static int numberOfGlobalDiscardCalls = 0;
+
 		private final Long state;
 
-		private int numberOfDiscardCalls;
+		private int numberOfDiscardCalls = 0;
 
 		public LongRetrievableStateHandle(Long state) {
 			this.state = state;
@@ -790,6 +740,7 @@ public Long retrieveState() {
 
 		@Override
 		public void discardState() throws Exception {
+			numberOfGlobalDiscardCalls++;
 			numberOfDiscardCalls++;
 		}
 
@@ -798,8 +749,12 @@ public long getStateSize() {
 			return 0;
 		}
 
-		public int getNumberOfDiscardCalls() {
+		int getNumberOfDiscardCalls() {
 			return numberOfDiscardCalls;
 		}
+
+		public static int getNumberOfGlobalDiscardCalls() {
+			return numberOfGlobalDiscardCalls;
+		}
 	}
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> Make ZooKeeperStateHandleStore#releaseAndTryRemove synchronous
> --------------------------------------------------------------
>
>                 Key: FLINK-10185
>                 URL: https://issues.apache.org/jira/browse/FLINK-10185
>             Project: Flink
>          Issue Type: Improvement
>          Components: Distributed Coordination
>    Affects Versions: 1.4.2, 1.5.2, 1.6.0, 1.7.0
>            Reporter: Till Rohrmann
>            Assignee: Till Rohrmann
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 1.4.3, 1.6.1, 1.7.0, 1.5.4
>
>
> The {{ZooKeeperStateHandleStore#releaseAndTryRemove}} method executes parts of its logic synchronously (retrieving the state handle and unlocking the ZNode) and others asynchronously (removing the ZNode). Moreover, the method takes a parameter which is used to execute some logic in case of a successful removal. This was done in order to execute a potentially blocking state discard operation in a different thread.
> I think this can be simplified by executing all logic in the same thread and running the callback after having called {{ZooKeeperStateHandleStore#releaseAndTryRemove}}. Moreover, if this operation needs to be not blocking one could use a different thread to call into this method. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)