You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/09/12 14:43:26 UTC

[flink] 03/08: [FLINK-10185] Make ZooKeeperStateHandleStore#releaseAndTryRemove synchronous

This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit a0b668555f5ee407d3cdbc42113cc28f019fc318
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Mon Aug 20 11:19:18 2018 +0200

    [FLINK-10185] Make ZooKeeperStateHandleStore#releaseAndTryRemove synchronous
    
    Remove the asynchronous callback from ZooKeeperStateHandleStore#releaseAndTryRemove.
    Instead we can execute the callback after having executed the releaseAndTryRemove
    method successfully. This separates concerns better because we don't mix storage
    with business logic. Furthermore, we can still avoid blocking operations if we use a
    separate thread to call into ZooKeeperStateHandleStore#releaseAndTryRemove.
    
    This closes #6586.
---
 .../services/ZooKeeperMesosServices.java           |   3 +-
 .../ZooKeeperCompletedCheckpointStore.java         | 101 +++-----
 .../zookeeper/ZooKeeperHaServices.java             |   2 +-
 .../ZooKeeperSubmittedJobGraphStore.java           |   7 +-
 .../apache/flink/runtime/util/ZooKeeperUtils.java  |   8 +-
 .../zookeeper/ZooKeeperStateHandleStore.java       | 156 ++---------
 .../runtime/zookeeper/ZooKeeperUtilityFactory.java |  14 +-
 .../checkpoint/CompletedCheckpointStoreTest.java   |   9 +-
 ...KeeperCompletedCheckpointStoreMockitoTest.java} |  19 +-
 .../ZooKeeperCompletedCheckpointStoreTest.java     | 288 +++++++--------------
 .../ZooKeeperSubmittedJobGraphsStoreITCase.java    |  14 +-
 .../flink/runtime/zookeeper/ZooKeeperResource.java |  72 ++++++
 .../zookeeper/ZooKeeperStateHandleStoreTest.java   | 115 +++-----
 13 files changed, 283 insertions(+), 525 deletions(-)

diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/ZooKeeperMesosServices.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/ZooKeeperMesosServices.java
index 069cb83..45d1141 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/ZooKeeperMesosServices.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/ZooKeeperMesosServices.java
@@ -56,8 +56,7 @@ public class ZooKeeperMesosServices extends AbstractMesosServices {
 
 		ZooKeeperStateHandleStore<MesosWorkerStore.Worker> zooKeeperStateHandleStore = zooKeeperUtilityFactory.createZooKeeperStateHandleStore(
 			"/workers",
-			stateStorageHelper,
-			executor);
+			stateStorageHelper);
 
 		ZooKeeperSharedValue frameworkId = zooKeeperUtilityFactory.createSharedValue("/frameworkId", new byte[0]);
 		ZooKeeperSharedCount totalTaskCount = zooKeeperUtilityFactory.createSharedCount("/taskCount", 0);
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
index deb1ab3..1317339 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
@@ -25,14 +25,13 @@ import org.apache.flink.runtime.state.RetrievableStateHandle;
 import org.apache.flink.runtime.zookeeper.RetrievableStateStorageHelper;
 import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore;
 import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.function.ConsumerWithException;
 
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.utils.ZKPaths;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.annotation.Nullable;
-
 import java.io.IOException;
 import java.util.ArrayDeque;
 import java.util.ArrayList;
@@ -86,6 +85,8 @@ public class ZooKeeperCompletedCheckpointStore implements CompletedCheckpointSto
 	 */
 	private final ArrayDeque<CompletedCheckpoint> completedCheckpoints;
 
+	private final Executor executor;
+
 	/**
 	 * Creates a {@link ZooKeeperCompletedCheckpointStore} instance.
 	 *
@@ -98,7 +99,7 @@ public class ZooKeeperCompletedCheckpointStore implements CompletedCheckpointSto
 	 *                                       start with a '/')
 	 * @param stateStorage                   State storage to be used to persist the completed
 	 *                                       checkpoint
-	 * @param executor to give to the ZooKeeperStateHandleStore to run ZooKeeper callbacks
+	 * @param executor to execute blocking calls
 	 * @throws Exception
 	 */
 	public ZooKeeperCompletedCheckpointStore(
@@ -123,10 +124,12 @@ public class ZooKeeperCompletedCheckpointStore implements CompletedCheckpointSto
 		// All operations will have the path as root
 		this.client = client.usingNamespace(client.getNamespace() + checkpointsPath);
 
-		this.checkpointsInZooKeeper = new ZooKeeperStateHandleStore<>(this.client, stateStorage, executor);
+		this.checkpointsInZooKeeper = new ZooKeeperStateHandleStore<>(this.client, stateStorage);
 
 		this.completedCheckpoints = new ArrayDeque<>(maxNumberOfCheckpointsToRetain + 1);
 
+		this.executor = checkNotNull(executor);
+
 		LOG.info("Initialized in '{}'.", checkpointsPath);
 	}
 
@@ -236,16 +239,30 @@ public class ZooKeeperCompletedCheckpointStore implements CompletedCheckpointSto
 
 		// Everything worked, let's remove a previous checkpoint if necessary.
 		while (completedCheckpoints.size() > maxNumberOfCheckpointsToRetain) {
-			try {
-				removeSubsumed(completedCheckpoints.removeFirst());
-			} catch (Exception e) {
-				LOG.warn("Failed to subsume the old checkpoint", e);
-			}
+			final CompletedCheckpoint completedCheckpoint = completedCheckpoints.removeFirst();
+			tryRemoveCompletedCheckpoint(completedCheckpoint, CompletedCheckpoint::discardOnSubsume);
 		}
 
 		LOG.debug("Added {} to {}.", checkpoint, path);
 	}
 
+	private void tryRemoveCompletedCheckpoint(CompletedCheckpoint completedCheckpoint, ConsumerWithException<CompletedCheckpoint, Exception> discardCallback) {
+		try {
+			if (tryRemove(completedCheckpoint.getCheckpointID())) {
+				executor.execute(() -> {
+					try {
+						discardCallback.accept(completedCheckpoint);
+					} catch (Exception e) {
+						LOG.warn("Could not discard completed checkpoint {}.", completedCheckpoint.getCheckpointID(), e);
+					}
+				});
+
+			}
+		} catch (Exception e) {
+			LOG.warn("Failed to subsume the old checkpoint", e);
+		}
+	}
+
 	@Override
 	public CompletedCheckpoint getLatestCheckpoint() {
 		if (completedCheckpoints.isEmpty()) {
@@ -278,11 +295,9 @@ public class ZooKeeperCompletedCheckpointStore implements CompletedCheckpointSto
 			LOG.info("Shutting down");
 
 			for (CompletedCheckpoint checkpoint : completedCheckpoints) {
-				try {
-					removeShutdown(checkpoint, jobStatus);
-				} catch (Exception e) {
-					LOG.error("Failed to discard checkpoint.", e);
-				}
+				tryRemoveCompletedCheckpoint(
+					checkpoint,
+					completedCheckpoint -> completedCheckpoint.discardOnShutdown(jobStatus));
 			}
 
 			completedCheckpoints.clear();
@@ -305,59 +320,13 @@ public class ZooKeeperCompletedCheckpointStore implements CompletedCheckpointSto
 	// ------------------------------------------------------------------------
 
 	/**
-	 * Removes a subsumed checkpoint from ZooKeeper and drops the state.
-	 */
-	private void removeSubsumed(
-		final CompletedCheckpoint completedCheckpoint) throws Exception {
-
-		if (completedCheckpoint == null) {
-			return;
-		}
-
-		ZooKeeperStateHandleStore.RemoveCallback<CompletedCheckpoint> action =
-			new ZooKeeperStateHandleStore.RemoveCallback<CompletedCheckpoint>() {
-				@Override
-				public void apply(@Nullable RetrievableStateHandle<CompletedCheckpoint> value) throws FlinkException {
-					if (value != null) {
-						try {
-							completedCheckpoint.discardOnSubsume();
-						} catch (Exception e) {
-							throw new FlinkException("Could not discard the completed checkpoint on subsume.", e);
-						}
-					}
-				}
-			};
-
-		checkpointsInZooKeeper.releaseAndTryRemove(
-			checkpointIdToPath(completedCheckpoint.getCheckpointID()),
-			action);
-	}
-
-	/**
-	 * Removes a checkpoint from ZooKeeper because of Job shutdown and drops the state.
+	 * Tries to remove the checkpoint identified by the given checkpoint id.
+	 *
+	 * @param checkpointId identifying the checkpoint to remove
+	 * @return true if the checkpoint could be removed
 	 */
-	private void removeShutdown(
-			final CompletedCheckpoint completedCheckpoint,
-			final JobStatus jobStatus) throws Exception {
-
-		if (completedCheckpoint == null) {
-			return;
-		}
-
-		ZooKeeperStateHandleStore.RemoveCallback<CompletedCheckpoint> removeAction = new ZooKeeperStateHandleStore.RemoveCallback<CompletedCheckpoint>() {
-			@Override
-			public void apply(@Nullable RetrievableStateHandle<CompletedCheckpoint> value) throws FlinkException {
-				try {
-					completedCheckpoint.discardOnShutdown(jobStatus);
-				} catch (Exception e) {
-					throw new FlinkException("Could not discard the completed checkpoint on subsume.", e);
-				}
-			}
-		};
-
-		checkpointsInZooKeeper.releaseAndTryRemove(
-			checkpointIdToPath(completedCheckpoint.getCheckpointID()),
-			removeAction);
+	private boolean tryRemove(long checkpointId) throws Exception {
+		return checkpointsInZooKeeper.releaseAndTryRemove(checkpointIdToPath(checkpointId));
 	}
 
 	/**
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServices.java
index 3882479..ea96d7d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServices.java
@@ -179,7 +179,7 @@ public class ZooKeeperHaServices implements HighAvailabilityServices {
 
 	@Override
 	public SubmittedJobGraphStore getSubmittedJobGraphStore() throws Exception {
-		return ZooKeeperUtils.createSubmittedJobGraphs(client, configuration, executor);
+		return ZooKeeperUtils.createSubmittedJobGraphs(client, configuration);
 	}
 
 	@Override
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
index 7ba5d48..0510815 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
@@ -41,7 +41,6 @@ import java.util.Collection;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
-import java.util.concurrent.Executor;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.apache.flink.util.Preconditions.checkState;
@@ -99,14 +98,12 @@ public class ZooKeeperSubmittedJobGraphStore implements SubmittedJobGraphStore {
 	 * @param client ZooKeeper client
 	 * @param currentJobsPath ZooKeeper path for current job graphs
 	 * @param stateStorage State storage used to persist the submitted jobs
-	 * @param executor to give to the ZooKeeperStateHandleStore to run ZooKeeper callbacks
 	 * @throws Exception
 	 */
 	public ZooKeeperSubmittedJobGraphStore(
 			CuratorFramework client,
 			String currentJobsPath,
-			RetrievableStateStorageHelper<SubmittedJobGraph> stateStorage,
-			Executor executor) throws Exception {
+			RetrievableStateStorageHelper<SubmittedJobGraph> stateStorage) throws Exception {
 
 		checkNotNull(currentJobsPath, "Current jobs path");
 		checkNotNull(stateStorage, "State storage");
@@ -123,7 +120,7 @@ public class ZooKeeperSubmittedJobGraphStore implements SubmittedJobGraphStore {
 		CuratorFramework facade = client.usingNamespace(client.getNamespace() + currentJobsPath);
 
 		this.zooKeeperFullBasePath = client.getNamespace() + currentJobsPath;
-		this.jobGraphsInZooKeeper = new ZooKeeperStateHandleStore<>(facade, stateStorage, executor);
+		this.jobGraphsInZooKeeper = new ZooKeeperStateHandleStore<>(facade, stateStorage);
 
 		this.pathCache = new PathChildrenCache(facade, "/", false);
 		pathCache.getListenable().addListener(new SubmittedJobGraphsPathCacheListener());
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
index 43c930e..d9c9161 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
@@ -227,14 +227,12 @@ public class ZooKeeperUtils {
 	 *
 	 * @param client        The {@link CuratorFramework} ZooKeeper client to use
 	 * @param configuration {@link Configuration} object
-	 * @param executor to run ZooKeeper callbacks
 	 * @return {@link ZooKeeperSubmittedJobGraphStore} instance
 	 * @throws Exception if the submitted job graph store cannot be created
 	 */
 	public static ZooKeeperSubmittedJobGraphStore createSubmittedJobGraphs(
 			CuratorFramework client,
-			Configuration configuration,
-			Executor executor) throws Exception {
+			Configuration configuration) throws Exception {
 
 		checkNotNull(configuration, "Configuration");
 
@@ -244,7 +242,9 @@ public class ZooKeeperUtils {
 		String zooKeeperSubmittedJobsPath = configuration.getString(HighAvailabilityOptions.HA_ZOOKEEPER_JOBGRAPHS_PATH);
 
 		return new ZooKeeperSubmittedJobGraphStore(
-				client, zooKeeperSubmittedJobsPath, stateStorage, executor);
+			client,
+			zooKeeperSubmittedJobsPath,
+			stateStorage);
 	}
 
 	/**
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
index e151a11..8c3d31f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
@@ -21,14 +21,9 @@ package org.apache.flink.runtime.zookeeper;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.state.RetrievableStateHandle;
 import org.apache.flink.util.ExceptionUtils;
-import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.InstantiationUtil;
-import org.apache.flink.util.Preconditions;
 
 import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.api.BackgroundCallback;
-import org.apache.curator.framework.api.CuratorEvent;
-import org.apache.curator.framework.api.CuratorEventType;
 import org.apache.curator.utils.ZKPaths;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
@@ -46,7 +41,6 @@ import java.util.Collections;
 import java.util.ConcurrentModificationException;
 import java.util.List;
 import java.util.UUID;
-import java.util.concurrent.Executor;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -91,8 +85,6 @@ public class ZooKeeperStateHandleStore<T extends Serializable> {
 
 	private final RetrievableStateStorageHelper<T> storage;
 
-	private final Executor executor;
-
 	/** Lock node name of this ZooKeeperStateHandleStore. The name should be unique among all other state handle stores. */
 	private final String lockNode;
 
@@ -105,16 +97,13 @@ public class ZooKeeperStateHandleStore<T extends Serializable> {
 	 *                            instance, e.g. <code>client.usingNamespace("/stateHandles")</code>
 	 * @param storage to persist the actual state and whose returned state handle is then written
 	 *                to ZooKeeper
-	 * @param executor to run the ZooKeeper callbacks
 	 */
 	public ZooKeeperStateHandleStore(
 		CuratorFramework client,
-		RetrievableStateStorageHelper<T> storage,
-		Executor executor) {
+		RetrievableStateStorageHelper<T> storage) {
 
 		this.client = checkNotNull(client, "Curator client");
 		this.storage = checkNotNull(storage, "State storage");
-		this.executor = checkNotNull(executor);
 
 		// Generate a unique lock node name
 		lockNode = UUID.randomUUID().toString();
@@ -395,33 +384,14 @@ public class ZooKeeperStateHandleStore<T extends Serializable> {
 
 	/**
 	 * Releases the lock for the given state node and tries to remove the state node if it is no longer locked.
-	 * The deletion of the state node is executed asynchronously.
-	 *
-	 * <p><strong>Important</strong>: This also discards the stored state handle after the given action
-	 * has been executed.
-	 *
-	 * @param pathInZooKeeper Path of state handle to remove (expected to start with a '/')
-	 * @throws Exception If the ZooKeeper operation fails
-	 */
-	public void releaseAndTryRemove(String pathInZooKeeper) throws Exception {
-		releaseAndTryRemove(pathInZooKeeper, null);
-	}
-
-	/**
-	 * Releases the lock for the given state node and tries to remove the state node if it is no longer locked.
-	 * The deletion of the state node is executed asynchronously. After the state node has been deleted, the given
-	 * callback is called with the {@link RetrievableStateHandle} of the deleted state node.
-	 *
-	 * <p><strong>Important</strong>: This also discards the stored state handle after the given action
-	 * has been executed.
+	 * It returns the {@link RetrievableStateHandle} stored under the given state node if any.
 	 *
 	 * @param pathInZooKeeper Path of state handle to remove
-	 * @param callback The callback to execute after a successful deletion. Null if no action needs to be executed.
-	 * @throws Exception If the ZooKeeper operation fails
+	 * @return True if the state handle could be released
+	 * @throws Exception If the ZooKeeper operation or discarding the state handle fails
 	 */
-	public void releaseAndTryRemove(
-			String pathInZooKeeper,
-			@Nullable final RemoveCallback<T> callback) throws Exception {
+	@Nullable
+	public boolean releaseAndTryRemove(String pathInZooKeeper) throws Exception {
 		checkNotNull(pathInZooKeeper, "Path in ZooKeeper");
 
 		final String path = normalizePath(pathInZooKeeper);
@@ -431,14 +401,23 @@ public class ZooKeeperStateHandleStore<T extends Serializable> {
 		try {
 			stateHandle = get(path, false);
 		} catch (Exception e) {
-			LOG.warn("Could not retrieve the state handle from node " + path + '.', e);
+			LOG.warn("Could not retrieve the state handle from node {}.", path, e);
 		}
 
 		release(pathInZooKeeper);
 
-		final BackgroundCallback backgroundCallback = new RemoveBackgroundCallback<>(stateHandle, callback, path);
+		try {
+			client.delete().forPath(path);
+		} catch (KeeperException.NotEmptyException ignored) {
+			LOG.debug("Could not delete znode {} because it is still locked.", path);
+			return false;
+		}
+
+		if (stateHandle != null) {
+			stateHandle.discardState();
+		}
 
-		client.delete().inBackground(backgroundCallback, executor).forPath(path);
+		return true;
 	}
 
 	/**
@@ -597,103 +576,4 @@ public class ZooKeeperStateHandleStore<T extends Serializable> {
 			return '/' + path;
 		}
 	}
-
-	// ---------------------------------------------------------------------------------------------------------
-	// Utility classes
-	// ---------------------------------------------------------------------------------------------------------
-
-	/**
-	 * Callback which is executed when removing a node from ZooKeeper. The callback will call the given
-	 * {@link RemoveCallback} if it is not null. Afterwards, it will discard the given {@link RetrievableStateHandle}
-	 * if it is not null.
-	 *
-	 * @param <T> Type of the value stored in the RetrievableStateHandle
-	 */
-	private static final class RemoveBackgroundCallback<T extends Serializable> implements BackgroundCallback {
-		@Nullable
-		private final RetrievableStateHandle<T> stateHandle;
-
-		@Nullable
-		private final RemoveCallback<T> callback;
-
-		private final String pathInZooKeeper;
-
-		private RemoveBackgroundCallback(
-			@Nullable RetrievableStateHandle<T> stateHandle,
-			@Nullable RemoveCallback<T> callback,
-			String pathInZooKeeper) {
-
-			this.stateHandle = stateHandle;
-			this.callback = callback;
-			this.pathInZooKeeper = Preconditions.checkNotNull(pathInZooKeeper);
-		}
-
-		@Override
-		public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
-			try {
-				if (event.getType() == CuratorEventType.DELETE) {
-					final KeeperException.Code resultCode = KeeperException.Code.get(event.getResultCode());
-
-					if (resultCode == KeeperException.Code.OK) {
-						Exception exception = null;
-
-						if (null != callback) {
-							try {
-								callback.apply(stateHandle);
-							} catch (Throwable e) {
-								exception = new Exception("Could not execute delete action for node " +
-									pathInZooKeeper + '.', e);
-							}
-						}
-
-						if (stateHandle != null) {
-							try {
-								// Discard the state handle
-								stateHandle.discardState();
-							} catch (Throwable e) {
-								Exception newException = new Exception("Could not discard state handle of node " +
-									pathInZooKeeper + '.', e);
-
-								if (exception == null) {
-									exception = newException;
-								} else {
-									exception.addSuppressed(newException);
-								}
-							}
-						}
-
-						if (exception != null) {
-							throw exception;
-						}
-					} else if (resultCode == KeeperException.Code.NOTEMPTY) {
-						// Could not delete the node because it still contains children/locks
-						LOG.debug("Could not delete node " + pathInZooKeeper + " because it is still locked.");
-					} else {
-						throw new IllegalStateException("Unexpected result code " +
-							resultCode.name() + " in '" + event + "' callback.");
-					}
-				} else {
-					throw new IllegalStateException("Unexpected event type " +
-						event.getType() + " in '" + event + "' callback.");
-				}
-			} catch (Exception e) {
-				LOG.warn("Failed to run callback for delete operation on node " + pathInZooKeeper + '.', e);
-			}
-
-		}
-	}
-
-	/**
-	 * Callback interface for remove calls.
-	 */
-	public interface RemoveCallback<T extends Serializable> {
-		/**
-		 * Callback method. The parameter can be null if the {@link RetrievableStateHandle} could not be retrieved
-		 * from ZooKeeper.
-		 *
-		 * @param value RetrievableStateHandle retrieved from ZooKeeper, null if it was not retrievable
-		 * @throws FlinkException If the callback failed
-		 */
-		void apply(@Nullable RetrievableStateHandle<T> value) throws FlinkException;
-	}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperUtilityFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperUtilityFactory.java
index d3b7dc5..3e294e0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperUtilityFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperUtilityFactory.java
@@ -18,15 +18,15 @@
 
 package org.apache.flink.runtime.zookeeper;
 
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.recipes.shared.SharedCount;
-import org.apache.curator.framework.recipes.shared.SharedValue;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.util.ZooKeeperUtils;
 import org.apache.flink.util.Preconditions;
 
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.recipes.shared.SharedCount;
+import org.apache.curator.framework.recipes.shared.SharedValue;
+
 import java.io.Serializable;
-import java.util.concurrent.Executor;
 
 /**
  * Creates ZooKeeper utility classes without exposing the {@link CuratorFramework} dependency. The
@@ -71,7 +71,6 @@ public class ZooKeeperUtilityFactory {
 	 *
 	 * @param zkStateHandleStorePath specifying the path in ZooKeeper to store the state handles to
 	 * @param stateStorageHelper storing the actual state data
-	 * @param executor to run asynchronous callbacks of the state handle store
 	 * @param <T> Type of the state to be stored
 	 * @return a ZooKeeperStateHandleStore instance
 	 * @throws Exception if ZooKeeper could not create the provided state handle store path in
@@ -79,8 +78,7 @@ public class ZooKeeperUtilityFactory {
 	 */
 	public <T extends Serializable> ZooKeeperStateHandleStore<T> createZooKeeperStateHandleStore(
 			String zkStateHandleStorePath,
-			RetrievableStateStorageHelper<T> stateStorageHelper,
-			Executor executor) throws Exception {
+			RetrievableStateStorageHelper<T> stateStorageHelper) throws Exception {
 
 		facade.newNamespaceAwareEnsurePath(zkStateHandleStorePath).ensure(facade.getZookeeperClient());
 		CuratorFramework stateHandleStoreFacade = facade.usingNamespace(
@@ -88,7 +86,7 @@ public class ZooKeeperUtilityFactory {
 				facade.getNamespace(),
 				zkStateHandleStorePath));
 
-		return new ZooKeeperStateHandleStore<>(stateHandleStoreFacade, stateStorageHelper, executor);
+		return new ZooKeeperStateHandleStore<>(stateHandleStoreFacade, stateStorageHelper);
 	}
 
 	/**
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java
index 8156964..c4d8903 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java
@@ -193,7 +193,7 @@ public abstract class CompletedCheckpointStoreTest extends TestLogger {
 
 	// ---------------------------------------------------------------------------------------------
 
-	protected TestCompletedCheckpoint createCheckpoint(
+	public static TestCompletedCheckpoint createCheckpoint(
 		int id,
 		SharedStateRegistry sharedStateRegistry) throws IOException {
 
@@ -226,7 +226,12 @@ public abstract class CompletedCheckpointStoreTest extends TestLogger {
 		}
 	}
 
-	protected void verifyCheckpointDiscarded(Collection<OperatorState> operatorStates) {
+	public static void verifyCheckpointDiscarded(TestCompletedCheckpoint completedCheckpoint) {
+		assertTrue(completedCheckpoint.isDiscarded());
+		verifyCheckpointDiscarded(completedCheckpoint.getOperatorStates().values());
+	}
+
+	protected static void verifyCheckpointDiscarded(Collection<OperatorState> operatorStates) {
 		for (OperatorState operatorState : operatorStates) {
 			for (OperatorSubtaskState subtaskState : operatorState.getStates()) {
 				Assert.assertTrue(((TestOperatorSubtaskState)subtaskState).discarded);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreMockitoTest.java
similarity index 95%
copy from flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java
copy to flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreMockitoTest.java
index 0384733..1f7d369 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreMockitoTest.java
@@ -66,20 +66,11 @@ import static org.powermock.api.mockito.PowerMockito.doThrow;
 import static org.powermock.api.mockito.PowerMockito.whenNew;
 
 /**
- * Tests for {@link ZooKeeperCompletedCheckpointStore}.
+ * Mockito based tests for the {@link ZooKeeperStateHandleStore}.
  */
 @RunWith(PowerMockRunner.class)
 @PrepareForTest(ZooKeeperCompletedCheckpointStore.class)
-public class ZooKeeperCompletedCheckpointStoreTest extends TestLogger {
-
-	@Test
-	public void testPathConversion() {
-		final long checkpointId = 42L;
-
-		final String path = ZooKeeperCompletedCheckpointStore.checkpointIdToPath(checkpointId);
-
-		assertEquals(checkpointId, ZooKeeperCompletedCheckpointStore.pathToCheckpointId(path));
-	}
+public class ZooKeeperCompletedCheckpointStoreMockitoTest extends TestLogger {
 
 	/**
 	 * Tests that the completed checkpoint store can retrieve all checkpoints stored in ZooKeeper
@@ -133,7 +124,7 @@ public class ZooKeeperCompletedCheckpointStoreTest extends TestLogger {
 		final CuratorFramework client = mock(CuratorFramework.class, Mockito.RETURNS_DEEP_STUBS);
 		final RetrievableStateStorageHelper<CompletedCheckpoint> storageHelperMock = mock(RetrievableStateStorageHelper.class);
 
-		ZooKeeperStateHandleStore<CompletedCheckpoint> zooKeeperStateHandleStoreMock = spy(new ZooKeeperStateHandleStore<>(client, storageHelperMock, Executors.directExecutor()));
+		ZooKeeperStateHandleStore<CompletedCheckpoint> zooKeeperStateHandleStoreMock = spy(new ZooKeeperStateHandleStore<>(client, storageHelperMock));
 		whenNew(ZooKeeperStateHandleStore.class).withAnyArguments().thenReturn(zooKeeperStateHandleStoreMock);
 		doReturn(checkpointsInZooKeeper).when(zooKeeperStateHandleStoreMock).getAllSortedByNameAndLock();
 
@@ -221,7 +212,7 @@ public class ZooKeeperCompletedCheckpointStoreTest extends TestLogger {
 		final RetrievableStateStorageHelper<CompletedCheckpoint> storageHelperMock = mock(RetrievableStateStorageHelper.class);
 
 		ZooKeeperStateHandleStore<CompletedCheckpoint> zookeeperStateHandleStoreMock =
-			spy(new ZooKeeperStateHandleStore<>(client, storageHelperMock, Executors.directExecutor()));
+			spy(new ZooKeeperStateHandleStore<>(client, storageHelperMock));
 		whenNew(ZooKeeperStateHandleStore.class).withAnyArguments().thenReturn(zookeeperStateHandleStoreMock);
 
 		doAnswer(new Answer<RetrievableStateHandle<CompletedCheckpoint>>() {
@@ -236,7 +227,7 @@ public class ZooKeeperCompletedCheckpointStoreTest extends TestLogger {
 			}
 		}).when(zookeeperStateHandleStoreMock).addAndLock(anyString(), any(CompletedCheckpoint.class));
 
-		doThrow(new Exception()).when(zookeeperStateHandleStoreMock).releaseAndTryRemove(anyString(), any(ZooKeeperStateHandleStore.RemoveCallback.class));
+		doThrow(new Exception()).when(zookeeperStateHandleStoreMock).releaseAndTryRemove(anyString());
 
 		final int numCheckpointsToRetain = 1;
 		final String checkpointsPath = "foobar";
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java
index 0384733..f992d3b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java
@@ -18,60 +18,39 @@
 
 package org.apache.flink.runtime.checkpoint;
 
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.state.RetrievableStateHandle;
-import org.apache.flink.runtime.state.testutils.TestCompletedCheckpointStorageLocation;
+import org.apache.flink.runtime.state.SharedStateRegistry;
+import org.apache.flink.runtime.util.ZooKeeperUtils;
 import org.apache.flink.runtime.zookeeper.RetrievableStateStorageHelper;
-import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore;
+import org.apache.flink.runtime.zookeeper.ZooKeeperResource;
 import org.apache.flink.util.TestLogger;
 
 import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.api.BackgroundCallback;
-import org.apache.curator.framework.api.CuratorEvent;
-import org.apache.curator.framework.api.CuratorEventType;
-import org.apache.curator.framework.api.ErrorListenerPathable;
-import org.apache.curator.utils.EnsurePath;
+import org.hamcrest.Matchers;
+import org.junit.ClassRule;
 import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.mockito.Mockito;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
+
+import javax.annotation.Nonnull;
 
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
+import java.io.Serializable;
 import java.util.List;
-import java.util.concurrent.Executor;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyString;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-import static org.powermock.api.mockito.PowerMockito.doAnswer;
-import static org.powermock.api.mockito.PowerMockito.doThrow;
-import static org.powermock.api.mockito.PowerMockito.whenNew;
+import static org.junit.Assert.assertThat;
 
 /**
  * Tests for {@link ZooKeeperCompletedCheckpointStore}.
  */
-@RunWith(PowerMockRunner.class)
-@PrepareForTest(ZooKeeperCompletedCheckpointStore.class)
 public class ZooKeeperCompletedCheckpointStoreTest extends TestLogger {
 
+	@ClassRule
+	public static ZooKeeperResource zooKeeperResource = new ZooKeeperResource();
+
 	@Test
 	public void testPathConversion() {
 		final long checkpointId = 42L;
@@ -82,188 +61,103 @@ public class ZooKeeperCompletedCheckpointStoreTest extends TestLogger {
 	}
 
 	/**
-	 * Tests that the completed checkpoint store can retrieve all checkpoints stored in ZooKeeper
-	 * and ignores those which cannot be retrieved via their state handles.
-	 *
-	 * <p>We have a timeout in case the ZooKeeper store get's into a deadlock/livelock situation.
+	 * Tests that subsumed checkpoints are discarded.
 	 */
-	@Test(timeout = 50000)
-	public void testCheckpointRecovery() throws Exception {
-		final JobID jobID = new JobID();
-		final long checkpoint1Id = 1L;
-		final long checkpoint2Id = 2;
-		final List<Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String>> checkpointsInZooKeeper = new ArrayList<>(4);
-
-		final Collection<Long> expectedCheckpointIds = new HashSet<>(2);
-		expectedCheckpointIds.add(1L);
-		expectedCheckpointIds.add(2L);
-
-		final RetrievableStateHandle<CompletedCheckpoint> failingRetrievableStateHandle = mock(RetrievableStateHandle.class);
-		when(failingRetrievableStateHandle.retrieveState()).thenThrow(new IOException("Test exception"));
-
-		final RetrievableStateHandle<CompletedCheckpoint> retrievableStateHandle1 = mock(RetrievableStateHandle.class);
-		when(retrievableStateHandle1.retrieveState()).then(
-			(invocation) -> new CompletedCheckpoint(
-				jobID,
-				checkpoint1Id,
-				1L,
-				1L,
-				new HashMap<>(),
-				null,
-				CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION),
-				new TestCompletedCheckpointStorageLocation()));
-
-		final RetrievableStateHandle<CompletedCheckpoint> retrievableStateHandle2 = mock(RetrievableStateHandle.class);
-		when(retrievableStateHandle2.retrieveState()).then(
-			(invocation -> new CompletedCheckpoint(
-				jobID,
-				checkpoint2Id,
-				2L,
-				2L,
-				new HashMap<>(),
-				null,
-				CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION),
-				new TestCompletedCheckpointStorageLocation())));
-
-		checkpointsInZooKeeper.add(Tuple2.of(retrievableStateHandle1, "/foobar1"));
-		checkpointsInZooKeeper.add(Tuple2.of(failingRetrievableStateHandle, "/failing1"));
-		checkpointsInZooKeeper.add(Tuple2.of(retrievableStateHandle2, "/foobar2"));
-		checkpointsInZooKeeper.add(Tuple2.of(failingRetrievableStateHandle, "/failing2"));
-
-		final CuratorFramework client = mock(CuratorFramework.class, Mockito.RETURNS_DEEP_STUBS);
-		final RetrievableStateStorageHelper<CompletedCheckpoint> storageHelperMock = mock(RetrievableStateStorageHelper.class);
-
-		ZooKeeperStateHandleStore<CompletedCheckpoint> zooKeeperStateHandleStoreMock = spy(new ZooKeeperStateHandleStore<>(client, storageHelperMock, Executors.directExecutor()));
-		whenNew(ZooKeeperStateHandleStore.class).withAnyArguments().thenReturn(zooKeeperStateHandleStoreMock);
-		doReturn(checkpointsInZooKeeper).when(zooKeeperStateHandleStoreMock).getAllSortedByNameAndLock();
-
-		final int numCheckpointsToRetain = 1;
-
-		// Mocking for the delete operation on the CuratorFramework client
-		// It assures that the callback is executed synchronously
-
-		final EnsurePath ensurePathMock = mock(EnsurePath.class);
-		final CuratorEvent curatorEventMock = mock(CuratorEvent.class);
-		when(curatorEventMock.getType()).thenReturn(CuratorEventType.DELETE);
-		when(curatorEventMock.getResultCode()).thenReturn(0);
-		when(client.newNamespaceAwareEnsurePath(anyString())).thenReturn(ensurePathMock);
-
-		when(
-			client
-				.delete()
-				.inBackground(any(BackgroundCallback.class), any(Executor.class))
-		).thenAnswer(new Answer<ErrorListenerPathable<Void>>() {
-			@Override
-			public ErrorListenerPathable<Void> answer(InvocationOnMock invocation) throws Throwable {
-				final BackgroundCallback callback = (BackgroundCallback) invocation.getArguments()[0];
+	@Test
+	public void testDiscardingSubsumedCheckpoints() throws Exception {
+		final SharedStateRegistry sharedStateRegistry = new SharedStateRegistry();
+		final Configuration configuration = new Configuration();
+		configuration.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, zooKeeperResource.getConnectString());
+
+		final CuratorFramework client = ZooKeeperUtils.startCuratorFramework(configuration);
+		final ZooKeeperCompletedCheckpointStore checkpointStore = createZooKeeperCheckpointStore(client);
+
+		try {
+			final CompletedCheckpointStoreTest.TestCompletedCheckpoint checkpoint1 = CompletedCheckpointStoreTest.createCheckpoint(0, sharedStateRegistry);
+
+			checkpointStore.addCheckpoint(checkpoint1);
+			assertThat(checkpointStore.getAllCheckpoints(), Matchers.contains(checkpoint1));
+
+			final CompletedCheckpointStoreTest.TestCompletedCheckpoint checkpoint2 = CompletedCheckpointStoreTest.createCheckpoint(1, sharedStateRegistry);
+			checkpointStore.addCheckpoint(checkpoint2);
+			final List<CompletedCheckpoint> allCheckpoints = checkpointStore.getAllCheckpoints();
+			assertThat(allCheckpoints, Matchers.contains(checkpoint2));
+			assertThat(allCheckpoints, Matchers.not(Matchers.contains(checkpoint1)));
+
+			// verify that the subsumed checkpoint is discarded
+			CompletedCheckpointStoreTest.verifyCheckpointDiscarded(checkpoint1);
+		} finally {
+			client.close();
+		}
+	}
 
-				ErrorListenerPathable<Void> result = mock(ErrorListenerPathable.class);
+	/**
+	 * Tests that checkpoints are discarded when the completed checkpoint store is shut
+	 * down with a globally terminal state.
+	 */
+	@Test
+	public void testDiscardingCheckpointsAtShutDown() throws Exception {
+		final SharedStateRegistry sharedStateRegistry = new SharedStateRegistry();
+		final Configuration configuration = new Configuration();
+		configuration.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, zooKeeperResource.getConnectString());
 
-				when(result.forPath(anyString())).thenAnswer(new Answer<Void>() {
-					@Override
-					public Void answer(InvocationOnMock invocation) throws Throwable {
+		final CuratorFramework client = ZooKeeperUtils.startCuratorFramework(configuration);
+		final ZooKeeperCompletedCheckpointStore checkpointStore = createZooKeeperCheckpointStore(client);
 
-						callback.processResult(client, curatorEventMock);
+		try {
+			final CompletedCheckpointStoreTest.TestCompletedCheckpoint checkpoint1 = CompletedCheckpointStoreTest.createCheckpoint(0, sharedStateRegistry);
 
-						return null;
-					}
-				});
+			checkpointStore.addCheckpoint(checkpoint1);
+			assertThat(checkpointStore.getAllCheckpoints(), Matchers.contains(checkpoint1));
 
-				return result;
-			}
-		});
+			checkpointStore.shutdown(JobStatus.FINISHED);
 
-		final String checkpointsPath = "foobar";
-		final RetrievableStateStorageHelper<CompletedCheckpoint> stateStorage = mock(RetrievableStateStorageHelper.class);
+			// verify that the checkpoint is discarded
+			CompletedCheckpointStoreTest.verifyCheckpointDiscarded(checkpoint1);
+		} finally {
+			client.close();
+		}
+	}
 
-		ZooKeeperCompletedCheckpointStore zooKeeperCompletedCheckpointStore = new ZooKeeperCompletedCheckpointStore(
-			numCheckpointsToRetain,
+	@Nonnull
+	private ZooKeeperCompletedCheckpointStore createZooKeeperCheckpointStore(CuratorFramework client) throws Exception {
+		return new ZooKeeperCompletedCheckpointStore(
+			1,
 			client,
-			checkpointsPath,
-			stateStorage,
+			"/checkpoints",
+			new TestingRetrievableStateStorageHelper<>(),
 			Executors.directExecutor());
+	}
 
-		zooKeeperCompletedCheckpointStore.recover();
-
-		CompletedCheckpoint latestCompletedCheckpoint = zooKeeperCompletedCheckpointStore.getLatestCheckpoint();
-
-		// check that we return the latest retrievable checkpoint
-		// this should remove the latest checkpoint because it is broken
-		assertEquals(checkpoint2Id, latestCompletedCheckpoint.getCheckpointID());
-
-		// this should remove the second broken checkpoint because we're iterating over all checkpoints
-		List<CompletedCheckpoint> completedCheckpoints = zooKeeperCompletedCheckpointStore.getAllCheckpoints();
-
-		Collection<Long> actualCheckpointIds = new HashSet<>(completedCheckpoints.size());
-
-		for (CompletedCheckpoint completedCheckpoint : completedCheckpoints) {
-			actualCheckpointIds.add(completedCheckpoint.getCheckpointID());
+	private static final class TestingRetrievableStateStorageHelper<T extends Serializable> implements RetrievableStateStorageHelper<T> {
+		@Override
+		public RetrievableStateHandle<T> store(T state) {
+			return new TestingRetrievableStateHandle<>(state);
 		}
 
-		assertEquals(expectedCheckpointIds, actualCheckpointIds);
-
-		// check that we did not discard any of the state handles
-		verify(retrievableStateHandle1, never()).discardState();
-		verify(retrievableStateHandle2, never()).discardState();
+		private static class TestingRetrievableStateHandle<T extends Serializable> implements RetrievableStateHandle<T> {
 
-		// Make sure that we also didn't discard any of the broken handles. Only when checkpoints
-		// are subsumed should they be discarded.
-		verify(failingRetrievableStateHandle, never()).discardState();
-	}
+			private static final long serialVersionUID = 137053380713794300L;
 
-	/**
-	 * Tests that the checkpoint does not exist in the store when we fail to add
-	 * it into the store (i.e., there exists an exception thrown by the method).
-	 */
-	@Test
-	public void testAddCheckpointWithFailedRemove() throws Exception {
-		final CuratorFramework client = mock(CuratorFramework.class, Mockito.RETURNS_DEEP_STUBS);
-		final RetrievableStateStorageHelper<CompletedCheckpoint> storageHelperMock = mock(RetrievableStateStorageHelper.class);
+			private final T state;
 
-		ZooKeeperStateHandleStore<CompletedCheckpoint> zookeeperStateHandleStoreMock =
-			spy(new ZooKeeperStateHandleStore<>(client, storageHelperMock, Executors.directExecutor()));
-		whenNew(ZooKeeperStateHandleStore.class).withAnyArguments().thenReturn(zookeeperStateHandleStoreMock);
+			private TestingRetrievableStateHandle(T state) {
+				this.state = state;
+			}
 
-		doAnswer(new Answer<RetrievableStateHandle<CompletedCheckpoint>>() {
 			@Override
-			public RetrievableStateHandle<CompletedCheckpoint> answer(InvocationOnMock invocationOnMock) throws Throwable {
-				CompletedCheckpoint checkpoint = (CompletedCheckpoint) invocationOnMock.getArguments()[1];
-
-				RetrievableStateHandle<CompletedCheckpoint> retrievableStateHandle = mock(RetrievableStateHandle.class);
-				when(retrievableStateHandle.retrieveState()).thenReturn(checkpoint);
-
-				return retrievableStateHandle;
+			public T retrieveState() throws IOException, ClassNotFoundException {
+				return state;
 			}
-		}).when(zookeeperStateHandleStoreMock).addAndLock(anyString(), any(CompletedCheckpoint.class));
-
-		doThrow(new Exception()).when(zookeeperStateHandleStoreMock).releaseAndTryRemove(anyString(), any(ZooKeeperStateHandleStore.RemoveCallback.class));
 
-		final int numCheckpointsToRetain = 1;
-		final String checkpointsPath = "foobar";
-		final RetrievableStateStorageHelper<CompletedCheckpoint> stateSotrage = mock(RetrievableStateStorageHelper.class);
-
-		ZooKeeperCompletedCheckpointStore zooKeeperCompletedCheckpointStore = new ZooKeeperCompletedCheckpointStore(
-			numCheckpointsToRetain,
-			client,
-			checkpointsPath,
-			stateSotrage,
-			Executors.directExecutor());
+			@Override
+			public void discardState() throws Exception {
+				// no op
+			}
 
-		for (long i = 0; i <= numCheckpointsToRetain; ++i) {
-			CompletedCheckpoint checkpointToAdd = mock(CompletedCheckpoint.class);
-			doReturn(i).when(checkpointToAdd).getCheckpointID();
-			doReturn(Collections.emptyMap()).when(checkpointToAdd).getOperatorStates();
-
-			try {
-				zooKeeperCompletedCheckpointStore.addCheckpoint(checkpointToAdd);
-
-				// The checkpoint should be in the store if we successfully add it into the store.
-				List<CompletedCheckpoint> addedCheckpoints = zooKeeperCompletedCheckpointStore.getAllCheckpoints();
-				assertTrue(addedCheckpoints.contains(checkpointToAdd));
-			} catch (Exception e) {
-				// The checkpoint should not be in the store if any exception is thrown.
-				List<CompletedCheckpoint> addedCheckpoints = zooKeeperCompletedCheckpointStore.getAllCheckpoints();
-				assertFalse(addedCheckpoints.contains(checkpointToAdd));
+			@Override
+			public long getStateSize() {
+				return 0;
 			}
 		}
 	}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphsStoreITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphsStoreITCase.java
index c1a7b53..e9be145 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphsStoreITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphsStoreITCase.java
@@ -20,7 +20,6 @@ package org.apache.flink.runtime.jobmanager;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.akka.ListeningBehaviour;
-import org.apache.flink.runtime.concurrent.Executors;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore.SubmittedJobGraphListener;
@@ -90,8 +89,7 @@ public class ZooKeeperSubmittedJobGraphsStoreITCase extends TestLogger {
 		ZooKeeperSubmittedJobGraphStore jobGraphs = new ZooKeeperSubmittedJobGraphStore(
 			ZooKeeper.createClient(),
 			"/testPutAndRemoveJobGraph",
-			localStateStorage,
-			Executors.directExecutor());
+			localStateStorage);
 
 		try {
 			SubmittedJobGraphListener listener = mock(SubmittedJobGraphListener.class);
@@ -147,7 +145,7 @@ public class ZooKeeperSubmittedJobGraphsStoreITCase extends TestLogger {
 	@Test
 	public void testRecoverJobGraphs() throws Exception {
 		ZooKeeperSubmittedJobGraphStore jobGraphs = new ZooKeeperSubmittedJobGraphStore(
-				ZooKeeper.createClient(), "/testRecoverJobGraphs", localStateStorage, Executors.directExecutor());
+				ZooKeeper.createClient(), "/testRecoverJobGraphs", localStateStorage);
 
 		try {
 			SubmittedJobGraphListener listener = mock(SubmittedJobGraphListener.class);
@@ -198,10 +196,10 @@ public class ZooKeeperSubmittedJobGraphsStoreITCase extends TestLogger {
 
 		try {
 			jobGraphs = new ZooKeeperSubmittedJobGraphStore(
-					ZooKeeper.createClient(), "/testConcurrentAddJobGraph", localStateStorage, Executors.directExecutor());
+					ZooKeeper.createClient(), "/testConcurrentAddJobGraph", localStateStorage);
 
 			otherJobGraphs = new ZooKeeperSubmittedJobGraphStore(
-					ZooKeeper.createClient(), "/testConcurrentAddJobGraph", localStateStorage, Executors.directExecutor());
+					ZooKeeper.createClient(), "/testConcurrentAddJobGraph", localStateStorage);
 
 
 			SubmittedJobGraph jobGraph = createSubmittedJobGraph(new JobID(), 0);
@@ -257,10 +255,10 @@ public class ZooKeeperSubmittedJobGraphsStoreITCase extends TestLogger {
 	@Test(expected = IllegalStateException.class)
 	public void testUpdateJobGraphYouDidNotGetOrAdd() throws Exception {
 		ZooKeeperSubmittedJobGraphStore jobGraphs = new ZooKeeperSubmittedJobGraphStore(
-				ZooKeeper.createClient(), "/testUpdateJobGraphYouDidNotGetOrAdd", localStateStorage, Executors.directExecutor());
+				ZooKeeper.createClient(), "/testUpdateJobGraphYouDidNotGetOrAdd", localStateStorage);
 
 		ZooKeeperSubmittedJobGraphStore otherJobGraphs = new ZooKeeperSubmittedJobGraphStore(
-				ZooKeeper.createClient(), "/testUpdateJobGraphYouDidNotGetOrAdd", localStateStorage, Executors.directExecutor());
+				ZooKeeper.createClient(), "/testUpdateJobGraphYouDidNotGetOrAdd", localStateStorage);
 
 		jobGraphs.start(null);
 		otherJobGraphs.start(null);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperResource.java b/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperResource.java
new file mode 100644
index 0000000..c4c5694
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperResource.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.zookeeper;
+
+import org.apache.flink.util.Preconditions;
+
+import org.apache.curator.test.TestingServer;
+import org.junit.rules.ExternalResource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+
+/**
+ * {@link ExternalResource} which starts a {@link org.apache.zookeeper.server.ZooKeeperServer}.
+ */
+public class ZooKeeperResource extends ExternalResource {
+
+	private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperResource.class);
+
+	@Nullable
+	private TestingServer zooKeeperServer;
+
+	public String getConnectString() {
+		verifyIsRunning();
+		return zooKeeperServer.getConnectString();
+	}
+
+	private void verifyIsRunning() {
+		Preconditions.checkState(zooKeeperServer != null);
+	}
+
+	@Override
+	protected void before() throws Throwable {
+		terminateZooKeeperServer();
+		zooKeeperServer = new TestingServer(true);
+	}
+
+	private void terminateZooKeeperServer() throws IOException {
+		if (zooKeeperServer != null) {
+			zooKeeperServer.stop();
+			zooKeeperServer = null;
+		}
+	}
+
+	@Override
+	protected void after() {
+		try {
+			terminateZooKeeperServer();
+		} catch (IOException e) {
+			LOG.warn("Could not properly terminate the {}.", getClass().getSimpleName(), e);
+		}
+	}
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreTest.java
index fd39b25..2dd27e7 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreTest.java
@@ -18,21 +18,19 @@
 
 package org.apache.flink.runtime.zookeeper;
 
-import org.apache.curator.framework.CuratorFramework;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.HighAvailabilityOptions;
-import org.apache.flink.runtime.concurrent.Executors;
 import org.apache.flink.runtime.state.RetrievableStateHandle;
 import org.apache.flink.runtime.util.ZooKeeperUtils;
 import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.TestLogger;
+
+import org.apache.curator.framework.CuratorFramework;
 import org.apache.zookeeper.data.Stat;
 import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.Test;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -41,7 +39,6 @@ import java.util.Collection;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
-import java.util.concurrent.CountDownLatch;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotEquals;
@@ -49,12 +46,7 @@ import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 /**
@@ -88,8 +80,8 @@ public class ZooKeeperStateHandleStoreTest extends TestLogger {
 	@Test
 	public void testAddAndLock() throws Exception {
 		LongStateStorage longStateStorage = new LongStateStorage();
-		ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<Long>(
-				ZOOKEEPER.getClient(), longStateStorage, Executors.directExecutor());
+		ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
+			ZOOKEEPER.getClient(), longStateStorage);
 
 		// Config
 		final String pathInZooKeeper = "/testAdd";
@@ -136,7 +128,7 @@ public class ZooKeeperStateHandleStoreTest extends TestLogger {
 		LongStateStorage stateHandleProvider = new LongStateStorage();
 
 		ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
-				ZOOKEEPER.getClient(), stateHandleProvider, Executors.directExecutor());
+				ZOOKEEPER.getClient(), stateHandleProvider);
 
 		ZOOKEEPER.getClient().create().forPath("/testAddAlreadyExistingPath");
 
@@ -161,7 +153,7 @@ public class ZooKeeperStateHandleStoreTest extends TestLogger {
 		when(client.inTransaction().create()).thenThrow(new RuntimeException("Expected test Exception."));
 
 		ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
-				client, stateHandleProvider, Executors.directExecutor());
+				client, stateHandleProvider);
 
 		// Config
 		final String pathInZooKeeper = "/testAddDiscardStateHandleAfterFailure";
@@ -191,7 +183,7 @@ public class ZooKeeperStateHandleStoreTest extends TestLogger {
 		LongStateStorage stateHandleProvider = new LongStateStorage();
 
 		ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
-				ZOOKEEPER.getClient(), stateHandleProvider, Executors.directExecutor());
+				ZOOKEEPER.getClient(), stateHandleProvider);
 
 		// Config
 		final String pathInZooKeeper = "/testReplace";
@@ -230,7 +222,7 @@ public class ZooKeeperStateHandleStoreTest extends TestLogger {
 		RetrievableStateStorageHelper<Long> stateStorage = new LongStateStorage();
 
 		ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
-				ZOOKEEPER.getClient(), stateStorage, Executors.directExecutor());
+				ZOOKEEPER.getClient(), stateStorage);
 
 		store.replace("/testReplaceNonExistingPath", 0, 1L);
 	}
@@ -247,7 +239,7 @@ public class ZooKeeperStateHandleStoreTest extends TestLogger {
 		when(client.setData()).thenThrow(new RuntimeException("Expected test Exception."));
 
 		ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
-				client, stateHandleProvider, Executors.directExecutor());
+				client, stateHandleProvider);
 
 		// Config
 		final String pathInZooKeeper = "/testReplaceDiscardStateHandleAfterFailure";
@@ -289,7 +281,7 @@ public class ZooKeeperStateHandleStoreTest extends TestLogger {
 		LongStateStorage stateHandleProvider = new LongStateStorage();
 
 		ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
-				ZOOKEEPER.getClient(), stateHandleProvider, Executors.directExecutor());
+				ZOOKEEPER.getClient(), stateHandleProvider);
 
 		// Config
 		final String pathInZooKeeper = "/testGetAndExists";
@@ -314,7 +306,7 @@ public class ZooKeeperStateHandleStoreTest extends TestLogger {
 		LongStateStorage stateHandleProvider = new LongStateStorage();
 
 		ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
-				ZOOKEEPER.getClient(), stateHandleProvider, Executors.directExecutor());
+				ZOOKEEPER.getClient(), stateHandleProvider);
 
 		store.getAndLock("/testGetNonExistingPath");
 	}
@@ -328,7 +320,7 @@ public class ZooKeeperStateHandleStoreTest extends TestLogger {
 		LongStateStorage stateHandleProvider = new LongStateStorage();
 
 		ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
-				ZOOKEEPER.getClient(), stateHandleProvider, Executors.directExecutor());
+				ZOOKEEPER.getClient(), stateHandleProvider);
 
 		// Config
 		final String pathInZooKeeper = "/testGetAll";
@@ -359,7 +351,7 @@ public class ZooKeeperStateHandleStoreTest extends TestLogger {
 		LongStateStorage stateHandleProvider = new LongStateStorage();
 
 		ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
-				ZOOKEEPER.getClient(), stateHandleProvider, Executors.directExecutor());
+				ZOOKEEPER.getClient(), stateHandleProvider);
 
 		// Config
 		final String basePath = "/testGetAllSortedByName";
@@ -393,7 +385,7 @@ public class ZooKeeperStateHandleStoreTest extends TestLogger {
 		LongStateStorage stateHandleProvider = new LongStateStorage();
 
 		ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
-				ZOOKEEPER.getClient(), stateHandleProvider, Executors.directExecutor());
+				ZOOKEEPER.getClient(), stateHandleProvider);
 
 		// Config
 		final String pathInZooKeeper = "/testRemove";
@@ -401,50 +393,14 @@ public class ZooKeeperStateHandleStoreTest extends TestLogger {
 
 		store.addAndLock(pathInZooKeeper, state);
 
+		final int numberOfGlobalDiscardCalls = LongRetrievableStateHandle.getNumberOfGlobalDiscardCalls();
+
 		// Test
 		store.releaseAndTryRemove(pathInZooKeeper);
 
 		// Verify discarded
 		assertEquals(0, ZOOKEEPER.getClient().getChildren().forPath("/").size());
-	}
-
-	/**
-	 * Tests that state handles are correctly removed with a callback.
-	 */
-	@Test
-	public void testRemoveWithCallback() throws Exception {
-		// Setup
-		LongStateStorage stateHandleProvider = new LongStateStorage();
-
-		ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
-				ZOOKEEPER.getClient(), stateHandleProvider, Executors.directExecutor());
-
-		// Config
-		final String pathInZooKeeper = "/testRemoveWithCallback";
-		final Long state = 27255442L;
-
-		store.addAndLock(pathInZooKeeper, state);
-
-		final CountDownLatch sync = new CountDownLatch(1);
-		ZooKeeperStateHandleStore.RemoveCallback<Long> callback = mock(ZooKeeperStateHandleStore.RemoveCallback.class);
-		doAnswer(new Answer<Void>() {
-			@Override
-			public Void answer(InvocationOnMock invocation) throws Throwable {
-				sync.countDown();
-				return null;
-			}
-		}).when(callback).apply(any(RetrievableStateHandle.class));
-
-		// Test
-		store.releaseAndTryRemove(pathInZooKeeper, callback);
-
-		// Verify discarded and callback called
-		assertEquals(0, ZOOKEEPER.getClient().getChildren().forPath("/").size());
-
-		sync.await();
-
-		verify(callback, times(1))
-				.apply(any(RetrievableStateHandle.class));
+		assertEquals(numberOfGlobalDiscardCalls + 1, LongRetrievableStateHandle.getNumberOfGlobalDiscardCalls());
 	}
 
 	/** Tests that all state handles are correctly discarded. */
@@ -454,7 +410,7 @@ public class ZooKeeperStateHandleStoreTest extends TestLogger {
 		LongStateStorage stateHandleProvider = new LongStateStorage();
 
 		ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
-				ZOOKEEPER.getClient(), stateHandleProvider, Executors.directExecutor());
+				ZOOKEEPER.getClient(), stateHandleProvider);
 
 		// Config
 		final String pathInZooKeeper = "/testDiscardAll";
@@ -486,8 +442,7 @@ public class ZooKeeperStateHandleStoreTest extends TestLogger {
 
 		ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
 			ZOOKEEPER.getClient(),
-			stateStorage,
-			Executors.directExecutor());
+			stateStorage);
 
 		final Collection<Long> input = new HashSet<>();
 		input.add(1L);
@@ -543,13 +498,11 @@ public class ZooKeeperStateHandleStoreTest extends TestLogger {
 
 		ZooKeeperStateHandleStore<Long> zkStore1 = new ZooKeeperStateHandleStore<>(
 			ZOOKEEPER.getClient(),
-			longStateStorage,
-			Executors.directExecutor());
+			longStateStorage);
 
 		ZooKeeperStateHandleStore<Long> zkStore2 = new ZooKeeperStateHandleStore<>(
 			ZOOKEEPER.getClient(),
-			longStateStorage,
-			Executors.directExecutor());
+			longStateStorage);
 
 		final String statePath = "/state";
 
@@ -586,13 +539,11 @@ public class ZooKeeperStateHandleStoreTest extends TestLogger {
 
 		ZooKeeperStateHandleStore<Long> zkStore1 = new ZooKeeperStateHandleStore<>(
 			ZOOKEEPER.getClient(),
-			longStateStorage,
-			Executors.directExecutor());
+			longStateStorage);
 
 		ZooKeeperStateHandleStore<Long> zkStore2 = new ZooKeeperStateHandleStore<>(
 			ZOOKEEPER.getClient(),
-			longStateStorage,
-			Executors.directExecutor());
+			longStateStorage);
 
 		final String path = "/state";
 
@@ -649,8 +600,7 @@ public class ZooKeeperStateHandleStoreTest extends TestLogger {
 
 			ZooKeeperStateHandleStore<Long> zkStore = new ZooKeeperStateHandleStore<>(
 				client,
-				longStateStorage,
-				Executors.directExecutor());
+				longStateStorage);
 
 			final String path = "/state";
 
@@ -682,8 +632,7 @@ public class ZooKeeperStateHandleStoreTest extends TestLogger {
 
 		ZooKeeperStateHandleStore<Long> zkStore = new ZooKeeperStateHandleStore<>(
 			ZOOKEEPER.getClient(),
-			longStateStorage,
-			Executors.directExecutor());
+			longStateStorage);
 
 		final String path = "/state";
 
@@ -720,8 +669,7 @@ public class ZooKeeperStateHandleStoreTest extends TestLogger {
 
 		ZooKeeperStateHandleStore<Long> zkStore = new ZooKeeperStateHandleStore<>(
 			ZOOKEEPER.getClient(),
-			longStateStorage,
-			Executors.directExecutor());
+			longStateStorage);
 
 		final Collection<String> paths = Arrays.asList("/state1", "/state2", "/state3");
 
@@ -775,9 +723,11 @@ public class ZooKeeperStateHandleStoreTest extends TestLogger {
 
 		private static final long serialVersionUID = -3555329254423838912L;
 
+		private static int numberOfGlobalDiscardCalls = 0;
+
 		private final Long state;
 
-		private int numberOfDiscardCalls;
+		private int numberOfDiscardCalls = 0;
 
 		public LongRetrievableStateHandle(Long state) {
 			this.state = state;
@@ -790,6 +740,7 @@ public class ZooKeeperStateHandleStoreTest extends TestLogger {
 
 		@Override
 		public void discardState() throws Exception {
+			numberOfGlobalDiscardCalls++;
 			numberOfDiscardCalls++;
 		}
 
@@ -798,8 +749,12 @@ public class ZooKeeperStateHandleStoreTest extends TestLogger {
 			return 0;
 		}
 
-		public int getNumberOfDiscardCalls() {
+		int getNumberOfDiscardCalls() {
 			return numberOfDiscardCalls;
 		}
+
+		public static int getNumberOfGlobalDiscardCalls() {
+			return numberOfGlobalDiscardCalls;
+		}
 	}
 }