You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sr...@apache.org on 2017/05/19 08:58:17 UTC

[4/4] flink git commit: [FLINK-6612] Allow ZooKeeperStateHandleStore to lock created ZNodes

[FLINK-6612] Allow ZooKeeperStateHandleStore to lock created ZNodes

In order to guard against deletions of ZooKeeper nodes which are still being used
by a different ZooKeeperStateHandleStore, we have to introduce a locking mechanism.
Only after all ZooKeeperStateHandleStores have released their lock, the ZNode is
allowed to be deleted.

THe locking mechanism is implemented via ephemeral child nodes of the respective
ZooKeeper node. Whenever a ZooKeeperStateHandleStore wants to lock a ZNode, thus,
protecting it from being deleted, it creates an ephemeral child node. The node's
name is unique to the ZooKeeperStateHandleStore instance. The delete operations
will then only delete the node if it does not have any children associated.

In order to guard against oprhaned lock nodes, they are created as ephemeral nodes.
This means that they will be deleted by ZooKeeper once the connection of the
ZooKeeper client which created the node timed out.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3d119e11
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3d119e11
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3d119e11

Branch: refs/heads/master
Commit: 3d119e1155aa8930cc7b18a085d6790cb2c63b70
Parents: b8f8524
Author: Till Rohrmann <tr...@apache.org>
Authored: Wed May 17 14:52:04 2017 +0200
Committer: Stefan Richter <s....@data-artisans.com>
Committed: Fri May 19 10:57:32 2017 +0200

----------------------------------------------------------------------
 .../store/ZooKeeperMesosWorkerStore.java        |   8 +-
 .../ZooKeeperCompletedCheckpointStore.java      | 150 ++--
 .../ZooKeeperSubmittedJobGraphStore.java        |  50 +-
 .../zookeeper/ZooKeeperStateHandleStore.java    | 419 +++++++---
 .../CompletedCheckpointStoreTest.java           |   9 +
 ...ZooKeeperCompletedCheckpointStoreITCase.java | 133 ++-
 .../ZooKeeperCompletedCheckpointStoreTest.java  |  11 +-
 .../ZooKeeperStateHandleStoreITCase.java        | 642 ---------------
 .../ZooKeeperStateHandleStoreTest.java          | 805 +++++++++++++++++++
 9 files changed, 1345 insertions(+), 882 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/3d119e11/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/ZooKeeperMesosWorkerStore.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/ZooKeeperMesosWorkerStore.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/ZooKeeperMesosWorkerStore.java
index 42abd4c..663ce56 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/ZooKeeperMesosWorkerStore.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/ZooKeeperMesosWorkerStore.java
@@ -88,7 +88,7 @@ public class ZooKeeperMesosWorkerStore implements MesosWorkerStore {
 				totalTaskCountInZooKeeper.close();
 
 				if(cleanup) {
-					workersInZooKeeper.removeAndDiscardAllState();
+					workersInZooKeeper.releaseAndTryRemoveAll();
 				}
 
 				isRunning = false;
@@ -169,7 +169,7 @@ public class ZooKeeperMesosWorkerStore implements MesosWorkerStore {
 		synchronized (startStopLock) {
 			verifyIsRunning();
 
-			List<Tuple2<RetrievableStateHandle<Worker>, String>> handles = workersInZooKeeper.getAll();
+			List<Tuple2<RetrievableStateHandle<Worker>, String>> handles = workersInZooKeeper.getAllAndLock();
 
 			if(handles.size() != 0) {
 				List<MesosWorkerStore.Worker> workers = new ArrayList<>(handles.size());
@@ -199,7 +199,7 @@ public class ZooKeeperMesosWorkerStore implements MesosWorkerStore {
 			int currentVersion = workersInZooKeeper.exists(path);
 			if (currentVersion == -1) {
 				try {
-					workersInZooKeeper.add(path, worker);
+					workersInZooKeeper.addAndLock(path, worker);
 					LOG.debug("Added {} in ZooKeeper.", worker);
 				} catch (KeeperException.NodeExistsException ex) {
 					throw new ConcurrentModificationException("ZooKeeper unexpectedly modified", ex);
@@ -227,7 +227,7 @@ public class ZooKeeperMesosWorkerStore implements MesosWorkerStore {
 				return false;
 			}
 
-			workersInZooKeeper.removeAndDiscardState(path);
+			workersInZooKeeper.releaseAndTryRemove(path);
 			LOG.debug("Removed worker {} from ZooKeeper.", taskID);
 			return true;
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/3d119e11/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
index 95cfb0f..084d93e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
@@ -19,9 +19,6 @@
 package org.apache.flink.runtime.checkpoint;
 
 import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.api.BackgroundCallback;
-import org.apache.curator.framework.api.CuratorEvent;
-import org.apache.curator.framework.api.CuratorEventType;
 import org.apache.curator.utils.ZKPaths;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.jobgraph.JobStatus;
@@ -34,12 +31,12 @@ import org.apache.flink.util.FlinkException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nullable;
 import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.ConcurrentModificationException;
 import java.util.Iterator;
 import java.util.List;
-import java.util.concurrent.Callable;
 import java.util.concurrent.Executor;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
@@ -155,7 +152,7 @@ public class ZooKeeperCompletedCheckpointStore extends AbstractCompletedCheckpoi
 		List<Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String>> initialCheckpoints;
 		while (true) {
 			try {
-				initialCheckpoints = checkpointsInZooKeeper.getAllSortedByName();
+				initialCheckpoints = checkpointsInZooKeeper.getAllSortedByNameAndLock();
 				break;
 			}
 			catch (ConcurrentModificationException e) {
@@ -178,7 +175,7 @@ public class ZooKeeperCompletedCheckpointStore extends AbstractCompletedCheckpoi
 					"checkpoint store.", e);
 
 				// remove the checkpoint with broken state handle
-				removeBrokenStateHandle(checkpointStateHandle);
+				removeBrokenStateHandle(checkpointStateHandle.f1, checkpointStateHandle.f0);
 			}
 
 			if (completedCheckpoint != null) {
@@ -201,7 +198,7 @@ public class ZooKeeperCompletedCheckpointStore extends AbstractCompletedCheckpoi
 		final RetrievableStateHandle<CompletedCheckpoint> stateHandle;
 
 		// First add the new one. If it fails, we don't want to loose existing data.
-		stateHandle = checkpointsInZooKeeper.add(path, checkpoint);
+		stateHandle = checkpointsInZooKeeper.addAndLock(path, checkpoint);
 
 		checkpointStateHandles.addLast(new Tuple2<>(stateHandle, path));
 
@@ -211,7 +208,7 @@ public class ZooKeeperCompletedCheckpointStore extends AbstractCompletedCheckpoi
 		// Everything worked, let's remove a previous checkpoint if necessary.
 		while (checkpointStateHandles.size() > maxNumberOfCheckpointsToRetain) {
 			try {
-				removeSubsumed(checkpointStateHandles.removeFirst(), sharedStateRegistry);
+				removeSubsumed(checkpointStateHandles.removeFirst().f1, sharedStateRegistry);
 			} catch (Exception e) {
 				LOG.warn("Failed to subsume the old checkpoint", e);
 			}
@@ -237,7 +234,8 @@ public class ZooKeeperCompletedCheckpointStore extends AbstractCompletedCheckpoi
 
 					try {
 						// remove the checkpoint with broken state handle
-						removeBrokenStateHandle(checkpointStateHandles.pollLast());
+						Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String> checkpoint = checkpointStateHandles.pollLast();
+						removeBrokenStateHandle(checkpoint.f1, checkpoint.f0);
 					} catch (Exception removeException) {
 						LOG.warn("Could not remove the latest checkpoint with a broken state handle.", removeException);
 					}
@@ -265,7 +263,7 @@ public class ZooKeeperCompletedCheckpointStore extends AbstractCompletedCheckpoi
 
 				// remove the checkpoint with broken state handle
 				stateHandleIterator.remove();
-				removeBrokenStateHandle(stateHandlePath);
+				removeBrokenStateHandle(stateHandlePath.f1, stateHandlePath.f0);
 			}
 		}
 
@@ -289,7 +287,7 @@ public class ZooKeeperCompletedCheckpointStore extends AbstractCompletedCheckpoi
 
 			for (Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String> checkpoint : checkpointStateHandles) {
 				try {
-					removeShutdown(checkpoint, jobStatus, sharedStateRegistry);
+					removeShutdown(checkpoint.f1, jobStatus, sharedStateRegistry);
 				} catch (Exception e) {
 					LOG.error("Failed to discard checkpoint.", e);
 				}
@@ -306,117 +304,87 @@ public class ZooKeeperCompletedCheckpointStore extends AbstractCompletedCheckpoi
 
 			// Clear the local handles, but don't remove any state
 			checkpointStateHandles.clear();
+
+			// Release the state handle locks in ZooKeeper such that they can be deleted
+			checkpointsInZooKeeper.releaseAll();
 		}
 	}
 
 	// ------------------------------------------------------------------------
 
 	private void removeSubsumed(
-		final Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String> stateHandleAndPath,
+		final String pathInZooKeeper,
 		final SharedStateRegistry sharedStateRegistry) throws Exception {
 		
-		Callable<Void> action = new Callable<Void>() {
+		ZooKeeperStateHandleStore.RemoveCallback<CompletedCheckpoint> action = new ZooKeeperStateHandleStore.RemoveCallback<CompletedCheckpoint>() {
 			@Override
-			public Void call() throws Exception {
-				CompletedCheckpoint completedCheckpoint = retrieveCompletedCheckpoint(stateHandleAndPath);
-				
-				if (completedCheckpoint != null) {
-					completedCheckpoint.discardOnSubsume(sharedStateRegistry);
-				}
+			public void apply(@Nullable RetrievableStateHandle<CompletedCheckpoint> value) throws FlinkException {
+				if (value != null) {
+					final CompletedCheckpoint completedCheckpoint;
+					try {
+						completedCheckpoint = value.retrieveState();
+					} catch (Exception e) {
+						throw new FlinkException("Could not retrieve the completed checkpoint from the given state handle.", e);
+					}
 
-				return null;
+					if (completedCheckpoint != null) {
+						try {
+							completedCheckpoint.discardOnSubsume(sharedStateRegistry);
+						} catch (Exception e) {
+							throw new FlinkException("Could not discard the completed checkpoint on subsume.", e);
+						}
+					}
+				}
 			}
 		};
 
-		remove(stateHandleAndPath, action);
+		checkpointsInZooKeeper.releaseAndTryRemove(pathInZooKeeper, action);
 	}
 
 	private void removeShutdown(
-			final Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String> stateHandleAndPath,
+			final String pathInZooKeeper,
 			final JobStatus jobStatus,
 			final SharedStateRegistry sharedStateRegistry) throws Exception {
 
-		Callable<Void> action = new Callable<Void>() {
+		ZooKeeperStateHandleStore.RemoveCallback<CompletedCheckpoint> removeAction = new ZooKeeperStateHandleStore.RemoveCallback<CompletedCheckpoint>() {
 			@Override
-			public Void call() throws Exception {
-				CompletedCheckpoint completedCheckpoint = retrieveCompletedCheckpoint(stateHandleAndPath);
-				
-				if (completedCheckpoint != null) {
-					completedCheckpoint.discardOnShutdown(jobStatus, sharedStateRegistry);
-				}
+			public void apply(@Nullable RetrievableStateHandle<CompletedCheckpoint> value) throws FlinkException {
+				if (value != null) {
+					final CompletedCheckpoint completedCheckpoint;
+
+					try {
+						completedCheckpoint = value.retrieveState();
+					} catch (Exception e) {
+						throw new FlinkException("Could not retrieve the completed checkpoint from the given state handle.", e);
+					}
 
-				return null;
+					if (completedCheckpoint != null) {
+						try {
+							completedCheckpoint.discardOnShutdown(jobStatus, sharedStateRegistry);
+						} catch (Exception e) {
+							throw new FlinkException("Could not discard the completed checkpoint on subsume.", e);
+						}
+					}
+				}
 			}
 		};
 
-		remove(stateHandleAndPath, action);
-	}
-
-	private void removeBrokenStateHandle(final Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String> stateHandleAndPath) throws Exception {
-		remove(stateHandleAndPath, null);
+		checkpointsInZooKeeper.releaseAndTryRemove(pathInZooKeeper, removeAction);
 	}
 
-	/**
-	 * Removes the state handle from ZooKeeper, discards the checkpoints, and the state handle.
-	 */
-	private void remove(
-			final Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String> stateHandleAndPath,
-			final Callable<Void> action) throws Exception {
-
-		BackgroundCallback callback = new BackgroundCallback() {
+	private void removeBrokenStateHandle(
+			final String pathInZooKeeper,
+			final RetrievableStateHandle<CompletedCheckpoint> retrievableStateHandle) throws Exception {
+		checkpointsInZooKeeper.releaseAndTryRemove(pathInZooKeeper, new ZooKeeperStateHandleStore.RemoveCallback<CompletedCheckpoint>() {
 			@Override
-			public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
-				final long checkpointId = pathToCheckpointId(stateHandleAndPath.f1);
-
+			public void apply(@Nullable RetrievableStateHandle<CompletedCheckpoint> value) throws FlinkException {
 				try {
-					if (event.getType() == CuratorEventType.DELETE) {
-						if (event.getResultCode() == 0) {
-							Exception exception = null;
-
-							if (null != action) {
-								try {
-									action.call();
-								} catch (Exception e) {
-									exception = new Exception("Could not execute callable action " +
-										"for checkpoint " + checkpointId + '.', e);
-								}
-							}
-
-							try {
-								// Discard the state handle
-								stateHandleAndPath.f0.discardState();
-							} catch (Exception e) {
-								Exception newException = new Exception("Could not discard meta " +
-									"data for completed checkpoint " + checkpointId + '.', e);
-
-								if (exception == null) {
-									exception = newException;
-								} else {
-									exception.addSuppressed(newException);
-								}
-							}
-
-							if (exception != null) {
-								throw exception;
-							}
-						} else {
-							throw new IllegalStateException("Unexpected result code " +
-									event.getResultCode() + " in '" + event + "' callback.");
-						}
-					} else {
-						throw new IllegalStateException("Unexpected event type " +
-								event.getType() + " in '" + event + "' callback.");
-					}
+					retrievableStateHandle.discardState();
 				} catch (Exception e) {
-					LOG.warn("Failed to discard checkpoint {}.", checkpointId, e);
+					throw new FlinkException("Could not discard state handle.", e);
 				}
 			}
-		};
-
-		// Remove state handle from ZooKeeper first. If this fails, we can still recover, but if
-		// we remove a state handle and fail to remove it from ZooKeeper, we end up in an
-		// inconsistent state.
-		checkpointsInZooKeeper.remove(stateHandleAndPath.f1, callback);
+		});
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/3d119e11/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
index 2552088..fa972ed 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
@@ -157,36 +157,46 @@ public class ZooKeeperSubmittedJobGraphStore implements SubmittedJobGraphStore {
 	@Override
 	public SubmittedJobGraph recoverJobGraph(JobID jobId) throws Exception {
 		checkNotNull(jobId, "Job ID");
-		String path = getPathForJob(jobId);
+		final String path = getPathForJob(jobId);
 
 		LOG.debug("Recovering job graph {} from {}{}.", jobId, zooKeeperFullBasePath, path);
 
 		synchronized (cacheLock) {
 			verifyIsRunning();
 
-			RetrievableStateHandle<SubmittedJobGraph> jobGraphRetrievableStateHandle;
+			boolean success = false;
 
 			try {
-				jobGraphRetrievableStateHandle = jobGraphsInZooKeeper.get(path);
-			} catch (KeeperException.NoNodeException ignored) {
-				return null;
-			} catch (Exception e) {
-				throw new Exception("Could not retrieve the submitted job graph state handle " +
-					"for " + path + "from the submitted job graph store.", e);
-			}
-			SubmittedJobGraph jobGraph;
+				RetrievableStateHandle<SubmittedJobGraph> jobGraphRetrievableStateHandle;
 
-			try {
-				jobGraph = jobGraphRetrievableStateHandle.retrieveState();
-			} catch (Exception e) {
-				throw new Exception("Failed to retrieve the submitted job graph from state handle.", e);
-			}
+				try {
+					jobGraphRetrievableStateHandle = jobGraphsInZooKeeper.getAndLock(path);
+				} catch (KeeperException.NoNodeException ignored) {
+					success = true;
+					return null;
+				} catch (Exception e) {
+					throw new Exception("Could not retrieve the submitted job graph state handle " +
+						"for " + path + "from the submitted job graph store.", e);
+				}
+				SubmittedJobGraph jobGraph;
 
-			addedJobGraphs.add(jobGraph.getJobId());
+				try {
+					jobGraph = jobGraphRetrievableStateHandle.retrieveState();
+				} catch (Exception e) {
+					throw new Exception("Failed to retrieve the submitted job graph from state handle.", e);
+				}
 
-			LOG.info("Recovered {}.", jobGraph);
+				addedJobGraphs.add(jobGraph.getJobId());
 
-			return jobGraph;
+				LOG.info("Recovered {}.", jobGraph);
+
+				success = true;
+				return jobGraph;
+			} finally {
+				if (!success) {
+					jobGraphsInZooKeeper.release(path);
+				}
+			}
 		}
 	}
 
@@ -207,7 +217,7 @@ public class ZooKeeperSubmittedJobGraphStore implements SubmittedJobGraphStore {
 
 				if (currentVersion == -1) {
 					try {
-						jobGraphsInZooKeeper.add(path, jobGraph);
+						jobGraphsInZooKeeper.addAndLock(path, jobGraph);
 
 						addedJobGraphs.add(jobGraph.getJobId());
 
@@ -245,7 +255,7 @@ public class ZooKeeperSubmittedJobGraphStore implements SubmittedJobGraphStore {
 
 		synchronized (cacheLock) {
 			if (addedJobGraphs.contains(jobId)) {
-				jobGraphsInZooKeeper.removeAndDiscardState(path);
+				jobGraphsInZooKeeper.releaseAndTryRemove(path);
 
 				addedJobGraphs.remove(jobId);
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/3d119e11/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
index 364ba0f..a548f1d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
@@ -20,28 +20,38 @@ package org.apache.flink.runtime.zookeeper;
 
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.api.BackgroundCallback;
+import org.apache.curator.framework.api.CuratorEvent;
+import org.apache.curator.framework.api.CuratorEventType;
 import org.apache.curator.utils.ZKPaths;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.state.RetrievableStateHandle;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.Preconditions;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.data.Stat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nullable;
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
+import java.util.UUID;
 import java.util.concurrent.Executor;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
- * State handles backed by ZooKeeper.
+ * Class which stores state via the provided {@link RetrievableStateStorageHelper} and writes the
+ * returned state handle to ZooKeeper. The ZooKeeper node can be locked by creating an ephemeral
+ * child and only allowing the deletion of the ZooKeeper node if it does not have any children.
+ * That way we protect concurrent accesses from different ZooKeeperStateHandleStore instances.
  *
  * <p>Added state is persisted via {@link RetrievableStateHandle RetrievableStateHandles},
  * which in turn are written to ZooKeeper. This level of indirection is necessary to keep the
@@ -80,6 +90,9 @@ public class ZooKeeperStateHandleStore<T extends Serializable> {
 
 	private final Executor executor;
 
+	/** Lock node name of this ZooKeeperStateHandleStore. The name should be unique among all other state handle stores. */
+	private final String lockNode;
+
 	/**
 	 * Creates a {@link ZooKeeperStateHandleStore}.
 	 *
@@ -99,40 +112,36 @@ public class ZooKeeperStateHandleStore<T extends Serializable> {
 		this.client = checkNotNull(client, "Curator client");
 		this.storage = checkNotNull(storage, "State storage");
 		this.executor = checkNotNull(executor);
-	}
 
-	/**
-	 * Creates a state handle and stores it in ZooKeeper with create mode {@link
-	 * CreateMode#PERSISTENT}.
-	 *
-	 * @see #add(String, T, CreateMode)
-	 */
-	public RetrievableStateHandle<T> add(String pathInZooKeeper, T state) throws Exception {
-		return add(pathInZooKeeper, state, CreateMode.PERSISTENT);
+		// Generate a unique lock node name
+		lockNode = UUID.randomUUID().toString();
 	}
 
 	/**
-	 * Creates a state handle and stores it in ZooKeeper.
+	 * Creates a state handle, stores it in ZooKeeper and locks it. A locked node cannot be removed by
+	 * another {@link ZooKeeperStateHandleStore} instance as long as this instance remains connected
+	 * to ZooKeeper.
 	 *
 	 * <p><strong>Important</strong>: This will <em>not</em> store the actual state in
 	 * ZooKeeper, but create a state handle and store it in ZooKeeper. This level of indirection
 	 * makes sure that data in ZooKeeper is small.
 	 *
-	 * @param pathInZooKeeper Destination path in ZooKeeper (expected to *not* exist yet and
-	 *                        start with a '/')
+	 * <p>The operation will fail if there is already an node under the given path
+	 *
+	 * @param pathInZooKeeper Destination path in ZooKeeper (expected to *not* exist yet)
 	 * @param state           State to be added
-	 * @param createMode      The create mode for the new path in ZooKeeper
 	 *
 	 * @return The Created {@link RetrievableStateHandle}.
 	 * @throws Exception If a ZooKeeper or state handle operation fails
 	 */
-	public RetrievableStateHandle<T> add(
+	public RetrievableStateHandle<T> addAndLock(
 			String pathInZooKeeper,
-			T state,
-			CreateMode createMode) throws Exception {
+			T state) throws Exception {
 		checkNotNull(pathInZooKeeper, "Path in ZooKeeper");
 		checkNotNull(state, "State");
 
+		final String path = normalizePath(pathInZooKeeper);
+
 		RetrievableStateHandle<T> storeHandle = storage.store(state);
 
 		boolean success = false;
@@ -145,7 +154,11 @@ public class ZooKeeperStateHandleStore<T extends Serializable> {
 			// smaller than the state itself. This level of indirection makes sure that data in
 			// ZooKeeper is small, because ZooKeeper is designed for data in the KB range, but
 			// the state can be larger.
-			client.create().withMode(createMode).forPath(pathInZooKeeper, serializedStoreHandle);
+			// Create the lock node in a transaction with the actual state node. That way we can prevent
+			// race conditions with a concurrent delete operation.
+			client.inTransaction().create().withMode(CreateMode.PERSISTENT).forPath(path, serializedStoreHandle)
+				.and().create().withMode(CreateMode.EPHEMERAL).forPath(getLockPath(path))
+				.and().commit();
 
 			success = true;
 			return storeHandle;
@@ -172,7 +185,9 @@ public class ZooKeeperStateHandleStore<T extends Serializable> {
 		checkNotNull(pathInZooKeeper, "Path in ZooKeeper");
 		checkNotNull(state, "State");
 
-		RetrievableStateHandle<T> oldStateHandle = get(pathInZooKeeper);
+		final String path = normalizePath(pathInZooKeeper);
+
+		RetrievableStateHandle<T> oldStateHandle = get(path, false);
 
 		RetrievableStateHandle<T> newStateHandle = storage.store(state);
 
@@ -185,7 +200,7 @@ public class ZooKeeperStateHandleStore<T extends Serializable> {
 			// Replace state handle in ZooKeeper.
 			client.setData()
 					.withVersion(expectedVersion)
-					.forPath(pathInZooKeeper, serializedStateHandle);
+					.forPath(path, serializedStateHandle);
 			success = true;
 		} finally {
 			if(success) {
@@ -207,7 +222,9 @@ public class ZooKeeperStateHandleStore<T extends Serializable> {
 	public int exists(String pathInZooKeeper) throws Exception {
 		checkNotNull(pathInZooKeeper, "Path in ZooKeeper");
 
-		Stat stat = client.checkExists().forPath(pathInZooKeeper);
+		final String path = normalizePath(pathInZooKeeper);
+
+		Stat stat = client.checkExists().forPath(path);
 
 		if (stat != null) {
 			return stat.getVersion();
@@ -217,32 +234,17 @@ public class ZooKeeperStateHandleStore<T extends Serializable> {
 	}
 
 	/**
-	 * Gets a state handle from ZooKeeper.
+	 * Gets the {@link RetrievableStateHandle} stored in the given ZooKeeper node and locks it. A
+	 * locked node cannot be removed by another {@link ZooKeeperStateHandleStore} instance as long
+	 * as this instance remains connected to ZooKeeper.
 	 *
-	 * @param pathInZooKeeper Path in ZooKeeper to get the state handle from (expected to
-	 *                        exist and start with a '/').
-	 * @return The state handle
-	 * @throws Exception If a ZooKeeper or state handle operation fails
+	 * @param pathInZooKeeper Path to the ZooKeeper node which contains the state handle
+	 * @return The retrieved state handle from the specified ZooKeeper node
+	 * @throws IOException Thrown if the method failed to deserialize the stored state handle
+	 * @throws Exception Thrown if a ZooKeeper operation failed
 	 */
-	@SuppressWarnings("unchecked")
-	public RetrievableStateHandle<T> get(String pathInZooKeeper) throws Exception {
-		checkNotNull(pathInZooKeeper, "Path in ZooKeeper");
-
-		byte[] data;
-
-		try {
-			data = client.getData().forPath(pathInZooKeeper);
-		} catch (Exception e) {
-			throw new Exception("Failed to retrieve state handle data under " + pathInZooKeeper +
-				" from ZooKeeper.", e);
-		}
-
-		try {
-			return InstantiationUtil.deserializeObject(data, Thread.currentThread().getContextClassLoader());
-		} catch (IOException | ClassNotFoundException e) {
-			throw new IOException("Failed to deserialize state handle from ZooKeeper data from " +
-				pathInZooKeeper + '.', e);
-		}
+	public RetrievableStateHandle<T> getAndLock(String pathInZooKeeper) throws Exception {
+		return get(pathInZooKeeper, true);
 	}
 
 	/**
@@ -270,7 +272,7 @@ public class ZooKeeperStateHandleStore<T extends Serializable> {
 	}
 
 	/**
-	 * Gets all available state handles from ZooKeeper.
+	 * Gets all available state handles from ZooKeeper and locks the respective state nodes.
 	 *
 	 * <p>If there is a concurrent modification, the operation is retried until it succeeds.
 	 *
@@ -278,7 +280,7 @@ public class ZooKeeperStateHandleStore<T extends Serializable> {
 	 * @throws Exception If a ZooKeeper or state handle operation fails
 	 */
 	@SuppressWarnings("unchecked")
-	public List<Tuple2<RetrievableStateHandle<T>, String>> getAll() throws Exception {
+	public List<Tuple2<RetrievableStateHandle<T>, String>> getAllAndLock() throws Exception {
 		final List<Tuple2<RetrievableStateHandle<T>, String>> stateHandles = new ArrayList<>();
 
 		boolean success = false;
@@ -300,7 +302,7 @@ public class ZooKeeperStateHandleStore<T extends Serializable> {
 					path = "/" + path;
 
 					try {
-						final RetrievableStateHandle<T> stateHandle = get(path);
+						final RetrievableStateHandle<T> stateHandle = getAndLock(path);
 						stateHandles.add(new Tuple2<>(stateHandle, path));
 					} catch (KeeperException.NoNodeException ignored) {
 						// Concurrent deletion, retry
@@ -323,7 +325,8 @@ public class ZooKeeperStateHandleStore<T extends Serializable> {
 
 
 	/**
-	 * Gets all available state handles from ZooKeeper sorted by name (ascending).
+	 * Gets all available state handles from ZooKeeper sorted by name (ascending) and locks the
+	 * respective state nodes.
 	 *
 	 * <p>If there is a concurrent modification, the operation is retried until it succeeds.
 	 *
@@ -331,7 +334,7 @@ public class ZooKeeperStateHandleStore<T extends Serializable> {
 	 * @throws Exception If a ZooKeeper or state handle operation fails
 	 */
 	@SuppressWarnings("unchecked")
-	public List<Tuple2<RetrievableStateHandle<T>, String>> getAllSortedByName() throws Exception {
+	public List<Tuple2<RetrievableStateHandle<T>, String>> getAllSortedByNameAndLock() throws Exception {
 		final List<Tuple2<RetrievableStateHandle<T>, String>> stateHandles = new ArrayList<>();
 
 		boolean success = false;
@@ -355,14 +358,16 @@ public class ZooKeeperStateHandleStore<T extends Serializable> {
 					path = "/" + path;
 
 					try {
-						final RetrievableStateHandle<T> stateHandle = get(path);
+						final RetrievableStateHandle<T> stateHandle = getAndLock(path);
 						stateHandles.add(new Tuple2<>(stateHandle, path));
 					} catch (KeeperException.NoNodeException ignored) {
 						// Concurrent deletion, retry
 						continue retry;
 					} catch (IOException ioException) {
 						LOG.warn("Could not get all ZooKeeper children. Node {} contained " +
-							"corrupted data. Ignoring this node.", path, ioException);
+							"corrupted data. Releasing and trying to remove this node.", path, ioException);
+
+						releaseAndTryRemove(path);
 					}
 				}
 
@@ -370,6 +375,9 @@ public class ZooKeeperStateHandleStore<T extends Serializable> {
 
 				// Check for concurrent modifications
 				success = initialCVersion == finalCVersion;
+
+				// we don't have to release all locked nodes in case of a concurrent modification, because we
+				// will retrieve them in the next iteration again.
 			}
 		}
 
@@ -377,75 +385,306 @@ public class ZooKeeperStateHandleStore<T extends Serializable> {
 	}
 
 	/**
-	 * Removes a state handle from ZooKeeper.
+	 * Releases the lock for the given state node and tries to remove the state node if it is no longer locked.
+	 * The deletion of the state node is executed asynchronously.
 	 *
-	 * <p><strong>Important</strong>: this does not discard the state handle. If you want to
-	 * discard the state handle call {@link #removeAndDiscardState(String)}.
+	 * <p><strong>Important</strong>: This also discards the stored state handle after the given action
+	 * has been executed.
 	 *
 	 * @param pathInZooKeeper Path of state handle to remove (expected to start with a '/')
 	 * @throws Exception If the ZooKeeper operation fails
 	 */
-	public void remove(String pathInZooKeeper) throws Exception {
-		checkNotNull(pathInZooKeeper, "Path in ZooKeeper");
-
-		client.delete().deletingChildrenIfNeeded().forPath(pathInZooKeeper);
+	public void releaseAndTryRemove(String pathInZooKeeper) throws Exception {
+		releaseAndTryRemove(pathInZooKeeper, null);
 	}
 
 	/**
-	 * Removes a state handle from ZooKeeper asynchronously.
+	 * Releases the lock for the given state node and tries to remove the state node if it is no longer locked.
+	 * The deletion of the state node is executed asynchronously. After the state node has been deleted, the given
+	 * callback is called with the {@link RetrievableStateHandle} of the deleted state node.
 	 *
-	 * <p><strong>Important</strong>: this does not discard the state handle. If you want to
-	 * discard the state handle call {@link #removeAndDiscardState(String)}.
+	 * <p><strong>Important</strong>: This also discards the stored state handle after the given action
+	 * has been executed.
 	 *
-	 * @param pathInZooKeeper Path of state handle to remove (expected to start with a '/')
-	 * @param callback        The callback after the operation finishes
+	 * @param pathInZooKeeper Path of state handle to remove
+	 * @param callback The callback to execute after a successful deletion. Null if no action needs to be executed.
 	 * @throws Exception If the ZooKeeper operation fails
 	 */
-	public void remove(String pathInZooKeeper, BackgroundCallback callback) throws Exception {
+	public void releaseAndTryRemove(
+			String pathInZooKeeper,
+			@Nullable final RemoveCallback<T> callback) throws Exception {
 		checkNotNull(pathInZooKeeper, "Path in ZooKeeper");
-		checkNotNull(callback, "Background callback");
 
-		client.delete().deletingChildrenIfNeeded().inBackground(callback, executor).forPath(pathInZooKeeper);
+		final String path = normalizePath(pathInZooKeeper);
+
+		RetrievableStateHandle<T> stateHandle = null;
+
+		try {
+			stateHandle = get(path, false);
+		} catch (Exception e) {
+			LOG.warn("Could not retrieve the state handle from node " + path + '.', e);
+		}
+
+		release(pathInZooKeeper);
+
+		final BackgroundCallback backgroundCallback = new RemoveBackgroundCallback<>(stateHandle, callback, path);
+
+		client.delete().inBackground(backgroundCallback, executor).forPath(path);
 	}
 
 	/**
-	 * Discards a state handle and removes it from ZooKeeper.
+	 * Releases all lock nodes of this ZooKeeperStateHandleStores and tries to remove all state nodes which
+	 * are not locked anymore.
 	 *
-	 * <p>If you only want to remove the state handle in ZooKeeper call {@link #remove(String)}.
+	 * <p>The delete operation is executed asynchronously
 	 *
-	 * @param pathInZooKeeper Path of state handle to discard (expected to start with a '/')
-	 * @throws Exception If the ZooKeeper or state handle operation fails
+	 * @throws Exception if the delete operation fails
 	 */
-	public void removeAndDiscardState(String pathInZooKeeper) throws Exception {
+	public void releaseAndTryRemoveAll() throws Exception {
+		Collection<String> children = getAllPaths();
+
+		Exception exception = null;
+
+		for (String child : children) {
+			try {
+				releaseAndTryRemove('/' + child);
+			} catch (Exception e) {
+				exception = ExceptionUtils.firstOrSuppressed(e, exception);
+			}
+		}
+
+		if (exception != null) {
+			throw new Exception("Could not properly release and try removing all state nodes.", exception);
+		}
+	}
+
+	/**
+	 * Releases the lock from the node under the given ZooKeeper path. If no lock exists, then nothing happens.
+	 *
+	 * @param pathInZooKeeper Path describing the ZooKeeper node
+	 * @throws Exception if the delete operation of the lock node fails
+	 */
+	public void release(String pathInZooKeeper) throws Exception {
+		final String path = normalizePath(pathInZooKeeper);
+
+		try {
+			client.delete().forPath(getLockPath(path));
+		} catch (KeeperException.NoNodeException ignored) {
+			// we have never locked this node
+		} catch (Exception e) {
+			throw new Exception("Could not release the lock: " + getLockPath(pathInZooKeeper) + '.', e);
+		}
+	}
+
+	/**
+	 * Releases all lock nodes of this ZooKeeperStateHandleStore.
+	 *
+	 * @throws Exception if the delete operation of a lock file fails
+	 */
+	public void releaseAll() throws Exception {
+		Collection<String> children = getAllPaths();
+
+		Exception exception = null;
+
+		for (String child: children) {
+			try {
+				release(child);
+			} catch (Exception e) {
+				exception = ExceptionUtils.firstOrSuppressed(e, exception);
+			}
+		}
+
+		if (exception != null) {
+			throw new Exception("Could not properly release all state nodes.", exception);
+		}
+	}
+
+	// ---------------------------------------------------------------------------------------------------------
+	// Protected methods
+	// ---------------------------------------------------------------------------------------------------------
+
+	/**
+	 * Returns the path for the lock node relative to the given path.
+	 *
+	 * @param rootPath Root path under which the lock node shall be created
+	 * @return Path for the lock node
+	 */
+	protected String getLockPath(String rootPath) {
+		return rootPath + '/' + lockNode;
+	}
+
+	// ---------------------------------------------------------------------------------------------------------
+	// Private methods
+	// ---------------------------------------------------------------------------------------------------------
+
+	/**
+	 * Gets a state handle from ZooKeeper and optionally locks it.
+	 *
+	 * @param pathInZooKeeper Path in ZooKeeper to get the state handle from
+	 * @param lock True if we should lock the node; otherwise false
+	 * @return The state handle
+	 * @throws IOException Thrown if the method failed to deserialize the stored state handle
+	 * @throws Exception Thrown if a ZooKeeper operation failed
+	 */
+	@SuppressWarnings("unchecked")
+	private RetrievableStateHandle<T> get(String pathInZooKeeper, boolean lock) throws Exception {
 		checkNotNull(pathInZooKeeper, "Path in ZooKeeper");
 
-		RetrievableStateHandle<T> stateHandle = get(pathInZooKeeper);
+		final String path = normalizePath(pathInZooKeeper);
+
+		if (lock) {
+			// try to lock the node
+			try {
+				client.create().withMode(CreateMode.EPHEMERAL).forPath(getLockPath(path));
+			} catch (KeeperException.NodeExistsException ignored) {
+				// we have already created the lock
+			} catch (KeeperException.NoNodeException e) {
+				throw new Exception("Cannot lock the node " + path + " since it does not exist.", e);
+			}
+		}
+
+		boolean success = false;
+
+		try {
+			byte[] data;
+
+			try {
+				data = client.getData().forPath(path);
+			} catch (Exception e) {
+				throw new Exception("Failed to retrieve state handle data under " + path +
+					" from ZooKeeper.", e);
+			}
+
+			try {
+				RetrievableStateHandle<T> retrievableStateHandle = InstantiationUtil.deserializeObject(
+					data,
+					Thread.currentThread().getContextClassLoader());
 
-		// Delete the state handle from ZooKeeper first
-		client.delete().deletingChildrenIfNeeded().forPath(pathInZooKeeper);
+				success = true;
 
-		// Discard the state handle only after it has been successfully deleted from ZooKeeper.
-		// Otherwise we might enter an illegal state after failures (with a state handle in
-		// ZooKeeper, which has already been discarded).
-		stateHandle.discardState();
+				return retrievableStateHandle;
+			} catch (IOException | ClassNotFoundException e) {
+				throw new IOException("Failed to deserialize state handle from ZooKeeper data from " +
+					path + '.', e);
+			}
+		} finally {
+			if (!success && lock) {
+				// release the lock
+				release(path);
+			}
+		}
 	}
 
 	/**
-	 * Discards all available state handles and removes them from ZooKeeper.
+	 * Makes sure that every path starts with a "/"
 	 *
-	 * @throws Exception If a ZooKeeper or state handle operation fails
+	 * @param path Path to normalize
+	 * @return Normalized path such that it starts with a "/"
 	 */
-	public void removeAndDiscardAllState() throws Exception {
-		final List<Tuple2<RetrievableStateHandle<T>, String>> allStateHandles = getAll();
+	private static String normalizePath(String path) {
+		if (path.startsWith("/")) {
+			return path;
+		} else {
+			return '/' + path;
+		}
+	}
 
-		ZKPaths.deleteChildren(
-				client.getZookeeperClient().getZooKeeper(),
-				ZKPaths.fixForNamespace(client.getNamespace(), "/"),
-				false);
+	// ---------------------------------------------------------------------------------------------------------
+	// Utility classes
+	// ---------------------------------------------------------------------------------------------------------
 
-		// Discard the state handles only after they have been successfully deleted from ZooKeeper.
-		for (Tuple2<RetrievableStateHandle<T>, String> stateHandleAndPath : allStateHandles) {
-			stateHandleAndPath.f0.discardState();
+	/**
+	 * Callback which is executed when removing a node from ZooKeeper. The callback will call the given
+	 * {@link RemoveCallback} if it is not null. Afterwards, it will discard the given {@link RetrievableStateHandle}
+	 * if it is not null.
+	 *
+	 * @param <T> Type of the value stored in the RetrievableStateHandle
+	 */
+	private static final class RemoveBackgroundCallback<T extends Serializable> implements BackgroundCallback {
+		@Nullable
+		private final RetrievableStateHandle<T> stateHandle;
+
+		@Nullable
+		private final RemoveCallback<T> callback;
+
+		private final String pathInZooKeeper;
+
+		private RemoveBackgroundCallback(
+			@Nullable RetrievableStateHandle<T> stateHandle,
+			@Nullable RemoveCallback<T> callback,
+			String pathInZooKeeper) {
+
+			this.stateHandle = stateHandle;
+			this.callback = callback;
+			this.pathInZooKeeper = Preconditions.checkNotNull(pathInZooKeeper);
 		}
+
+		@Override
+		public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
+			try {
+				if (event.getType() == CuratorEventType.DELETE) {
+					final KeeperException.Code resultCode = KeeperException.Code.get(event.getResultCode());
+
+					if (resultCode == KeeperException.Code.OK) {
+						Exception exception = null;
+
+						if (null != callback) {
+							try {
+								callback.apply(stateHandle);
+							} catch (Throwable e) {
+								exception = new Exception("Could not execute delete action for node " +
+									pathInZooKeeper + '.', e);
+							}
+						}
+
+						if (stateHandle != null) {
+							try {
+								// Discard the state handle
+								stateHandle.discardState();
+							} catch (Throwable e) {
+								Exception newException = new Exception("Could not discard state handle of node " +
+									pathInZooKeeper + '.', e);
+
+								if (exception == null) {
+									exception = newException;
+								} else {
+									exception.addSuppressed(newException);
+								}
+							}
+						}
+
+						if (exception != null) {
+							throw exception;
+						}
+					} else if (resultCode == KeeperException.Code.NOTEMPTY) {
+						// Could not delete the node because it still contains children/locks
+						LOG.debug("Could not delete node " + pathInZooKeeper + " because it is still locked.");
+					} else {
+						throw new IllegalStateException("Unexpected result code " +
+							resultCode.name() + " in '" + event + "' callback.");
+					}
+				} else {
+					throw new IllegalStateException("Unexpected event type " +
+						event.getType() + " in '" + event + "' callback.");
+				}
+			} catch (Exception e) {
+				LOG.warn("Failed to run callback for delete operation on node " + pathInZooKeeper + '.', e);
+			}
+
+		}
+	};
+
+	/**
+	 * Callback interface for remove calls
+	 */
+	public interface RemoveCallback<T extends Serializable> {
+		/**
+		 * Callback method. The parameter can be null if the {@link RetrievableStateHandle} could not be retrieved
+		 * from ZooKeeper.
+		 *
+		 * @param value RetrievableStateHandle retrieved from ZooKeeper, null if it was not retrievable
+		 * @throws FlinkException If the callback failed
+		 */
+		void apply(@Nullable RetrievableStateHandle<T> value) throws FlinkException;
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/3d119e11/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java
index 94bd12f..985c662 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java
@@ -32,6 +32,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
@@ -307,6 +308,14 @@ public abstract class CompletedCheckpointStoreTest extends TestLogger {
 			}
 		}
 
+		public boolean awaitDiscard(long timeout) throws InterruptedException {
+			if (discardLatch != null) {
+				return discardLatch.await(timeout, TimeUnit.MILLISECONDS);
+			} else {
+				return false;
+			}
+		}
+
 		@Override
 		public boolean equals(Object o) {
 			if (this == o) return true;

http://git-wip-us.apache.org/repos/asf/flink/blob/3d119e11/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java
index 3fd7f1b..0d93289 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java
@@ -24,50 +24,52 @@ import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.state.RetrievableStateHandle;
 import org.apache.flink.runtime.zookeeper.RetrievableStateStorageHelper;
 import org.apache.flink.runtime.zookeeper.ZooKeeperTestEnvironment;
+import org.apache.zookeeper.data.Stat;
 import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.Test;
 
 import java.io.Serializable;
 import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
 
 /**
  * Tests for basic {@link CompletedCheckpointStore} contract and ZooKeeper state handling.
  */
 public class ZooKeeperCompletedCheckpointStoreITCase extends CompletedCheckpointStoreTest {
 
-	private final static ZooKeeperTestEnvironment ZooKeeper = new ZooKeeperTestEnvironment(1);
+	private static final ZooKeeperTestEnvironment ZOOKEEPER = new ZooKeeperTestEnvironment(1);
 
-	private final static String CheckpointsPath = "/checkpoints";
+	private static final String CHECKPOINT_PATH = "/checkpoints";
 
 	@AfterClass
 	public static void tearDown() throws Exception {
-		if (ZooKeeper != null) {
-			ZooKeeper.shutdown();
+		if (ZOOKEEPER != null) {
+			ZOOKEEPER.shutdown();
 		}
 	}
 
 	@Before
 	public void cleanUp() throws Exception {
-		ZooKeeper.deleteAll();
+		ZOOKEEPER.deleteAll();
 	}
 
 	@Override
-	protected AbstractCompletedCheckpointStore createCompletedCheckpoints(
-			int maxNumberOfCheckpointsToRetain) throws Exception {
-
+	protected ZooKeeperCompletedCheckpointStore createCompletedCheckpoints(int maxNumberOfCheckpointsToRetain) throws Exception {
 		return new ZooKeeperCompletedCheckpointStore(maxNumberOfCheckpointsToRetain,
-			ZooKeeper.createClient(), CheckpointsPath, new RetrievableStateStorageHelper<CompletedCheckpoint>() {
-			@Override
-			public RetrievableStateHandle<CompletedCheckpoint> store(CompletedCheckpoint state) throws Exception {
-				return new HeapRetrievableStateHandle<>(state);
-			}
-		}, Executors.directExecutor());
+			ZOOKEEPER.getClient(),
+			CHECKPOINT_PATH,
+			new HeapStateStorageHelper(),
+			Executors.directExecutor());
 	}
 
 	// ---------------------------------------------------------------------------------------------
@@ -95,7 +97,7 @@ public class ZooKeeperCompletedCheckpointStoreITCase extends CompletedCheckpoint
 		verifyCheckpointRegistered(expected[2].getOperatorStates().values(), checkpoints.sharedStateRegistry);
 
 		// All three should be in ZK
-		assertEquals(3, ZooKeeper.getClient().getChildren().forPath(CheckpointsPath).size());
+		assertEquals(3, ZOOKEEPER.getClient().getChildren().forPath(CHECKPOINT_PATH).size());
 		assertEquals(3, checkpoints.getNumberOfRetainedCheckpoints());
 
 		resetCheckpoint(expected[0].getOperatorStates().values());
@@ -105,7 +107,7 @@ public class ZooKeeperCompletedCheckpointStoreITCase extends CompletedCheckpoint
 		// Recover TODO!!! clear registry!
 		checkpoints.recover();
 
-		assertEquals(3, ZooKeeper.getClient().getChildren().forPath(CheckpointsPath).size());
+		assertEquals(3, ZOOKEEPER.getClient().getChildren().forPath(CHECKPOINT_PATH).size());
 		assertEquals(3, checkpoints.getNumberOfRetainedCheckpoints());
 		assertEquals(expected[2], checkpoints.getLatestCheckpoint());
 
@@ -130,18 +132,18 @@ public class ZooKeeperCompletedCheckpointStoreITCase extends CompletedCheckpoint
 	 */
 	@Test
 	public void testShutdownDiscardsCheckpoints() throws Exception {
-		CuratorFramework client = ZooKeeper.getClient();
+		CuratorFramework client = ZOOKEEPER.getClient();
 
 		CompletedCheckpointStore store = createCompletedCheckpoints(1);
 		TestCompletedCheckpoint checkpoint = createCheckpoint(0);
 
 		store.addCheckpoint(checkpoint);
 		assertEquals(1, store.getNumberOfRetainedCheckpoints());
-		assertNotNull(client.checkExists().forPath(CheckpointsPath + ZooKeeperCompletedCheckpointStore.checkpointIdToPath(checkpoint.getCheckpointID())));
+		assertNotNull(client.checkExists().forPath(CHECKPOINT_PATH + ZooKeeperCompletedCheckpointStore.checkpointIdToPath(checkpoint.getCheckpointID())));
 
 		store.shutdown(JobStatus.FINISHED);
 		assertEquals(0, store.getNumberOfRetainedCheckpoints());
-		assertNull(client.checkExists().forPath(CheckpointsPath + ZooKeeperCompletedCheckpointStore.checkpointIdToPath(checkpoint.getCheckpointID())));
+		assertNull(client.checkExists().forPath(CHECKPOINT_PATH + ZooKeeperCompletedCheckpointStore.checkpointIdToPath(checkpoint.getCheckpointID())));
 
 		store.recover();
 
@@ -149,24 +151,30 @@ public class ZooKeeperCompletedCheckpointStoreITCase extends CompletedCheckpoint
 	}
 
 	/**
-	 * Tests that suspends keeps all checkpoints (as they can be recovered
-	 * later by the ZooKeeper store).
+	 * Tests that suspends keeps all checkpoints (so that they can be recovered
+	 * later by the ZooKeeper store). Furthermore, suspending a job should release
+	 * all locks.
 	 */
 	@Test
 	public void testSuspendKeepsCheckpoints() throws Exception {
-		CuratorFramework client = ZooKeeper.getClient();
+		CuratorFramework client = ZOOKEEPER.getClient();
 
 		CompletedCheckpointStore store = createCompletedCheckpoints(1);
 		TestCompletedCheckpoint checkpoint = createCheckpoint(0);
 
 		store.addCheckpoint(checkpoint);
 		assertEquals(1, store.getNumberOfRetainedCheckpoints());
-		assertNotNull(client.checkExists().forPath(CheckpointsPath + ZooKeeperCompletedCheckpointStore.checkpointIdToPath(checkpoint.getCheckpointID())));
+		assertNotNull(client.checkExists().forPath(CHECKPOINT_PATH + ZooKeeperCompletedCheckpointStore.checkpointIdToPath(checkpoint.getCheckpointID())));
 
 		store.shutdown(JobStatus.SUSPENDED);
 
 		assertEquals(0, store.getNumberOfRetainedCheckpoints());
-		assertNotNull(client.checkExists().forPath(CheckpointsPath + ZooKeeperCompletedCheckpointStore.checkpointIdToPath(checkpoint.getCheckpointID())));
+
+		final String checkpointPath = CHECKPOINT_PATH + ZooKeeperCompletedCheckpointStore.checkpointIdToPath(checkpoint.getCheckpointID());
+		Stat stat = client.checkExists().forPath(checkpointPath);
+
+		assertNotNull("The checkpoint node should exist.", stat);
+		assertEquals("The checkpoint node should not be locked.", 0, stat.getNumChildren());
 
 		// Recover again
 		store.recover();
@@ -201,24 +209,91 @@ public class ZooKeeperCompletedCheckpointStoreITCase extends CompletedCheckpoint
 		assertEquals(checkpoints.get(checkpoints.size() -1), latestCheckpoint);
 	}
 
+	/**
+	 * FLINK-6612
+	 *
+	 * Checks that a concurrent checkpoint completion won't discard a checkpoint which has been
+	 * recovered by a different completed checkpoint store.
+	 */
+	@Test
+	public void testConcurrentCheckpointOperations() throws Exception {
+		final int numberOfCheckpoints = 1;
+		final long waitingTimeout = 50L;
+
+		ZooKeeperCompletedCheckpointStore zkCheckpointStore1 = createCompletedCheckpoints(numberOfCheckpoints);
+		ZooKeeperCompletedCheckpointStore zkCheckpointStore2 = createCompletedCheckpoints(numberOfCheckpoints);
+
+		TestCompletedCheckpoint completedCheckpoint = createCheckpoint(1);
+
+		// complete the first checkpoint
+		zkCheckpointStore1.addCheckpoint(completedCheckpoint);
+
+		// recover the checkpoint by a different checkpoint store
+		zkCheckpointStore2.recover();
+
+		CompletedCheckpoint recoveredCheckpoint = zkCheckpointStore2.getLatestCheckpoint();
+		assertTrue(recoveredCheckpoint instanceof TestCompletedCheckpoint);
+		TestCompletedCheckpoint recoveredTestCheckpoint = (TestCompletedCheckpoint) recoveredCheckpoint;
+
+		// Check that the recovered checkpoint is not yet discarded
+		assertFalse(recoveredTestCheckpoint.isDiscarded());
+
+		// complete another checkpoint --> this should remove the first checkpoint from the store
+		// because the number of retained checkpoints == 1
+		TestCompletedCheckpoint completedCheckpoint2 = createCheckpoint(2);
+		zkCheckpointStore1.addCheckpoint(completedCheckpoint2);
+
+		List<CompletedCheckpoint> allCheckpoints = zkCheckpointStore1.getAllCheckpoints();
+
+		// check that we have removed the first checkpoint from zkCompletedStore1
+		assertEquals(Collections.singletonList(completedCheckpoint2), allCheckpoints);
+
+		// lets wait a little bit to see that no discard operation will be executed
+		assertFalse("The checkpoint should not have been discarded.", recoveredTestCheckpoint.awaitDiscard(waitingTimeout));
+
+		// check that we have not discarded the first completed checkpoint
+		assertFalse(recoveredTestCheckpoint.isDiscarded());
+
+		TestCompletedCheckpoint completedCheckpoint3 = createCheckpoint(3);
+
+		// this should release the last lock on completedCheckoint and thus discard it
+		zkCheckpointStore2.addCheckpoint(completedCheckpoint3);
+
+		// the checkpoint should be discarded eventually because there is no lock on it anymore
+		recoveredTestCheckpoint.awaitDiscard();
+	}
+
+
+	static class HeapStateStorageHelper implements RetrievableStateStorageHelper<CompletedCheckpoint> {
+		@Override
+		public RetrievableStateHandle<CompletedCheckpoint> store(CompletedCheckpoint state) throws Exception {
+			return new HeapRetrievableStateHandle<>(state);
+		}
+	}
+
 	static class HeapRetrievableStateHandle<T extends Serializable> implements RetrievableStateHandle<T> {
 
 		private static final long serialVersionUID = -268548467968932L;
 
+		private static AtomicInteger nextKey = new AtomicInteger(0);
+
+		private static HashMap<Integer, Object> stateMap = new HashMap<>();
+
+		private final int key;
+
 		public HeapRetrievableStateHandle(T state) {
-			this.state = state;
+			key = nextKey.getAndIncrement();
+			stateMap.put(key, state);
 		}
 
-		private T state;
-
 		@Override
 		public T retrieveState() throws Exception {
-			return state;
+			return (T) stateMap.get(key);
 		}
 
 		@Override
 		public void discardState() throws Exception {
-			state = null;
+			stateMap.remove(key);
 		}
 
 		@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/3d119e11/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java
index 0d22dc6..7d22d8e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java
@@ -110,7 +110,7 @@ public class ZooKeeperCompletedCheckpointStoreTest extends TestLogger {
 
 		ZooKeeperStateHandleStore<CompletedCheckpoint> zooKeeperStateHandleStoreMock = spy(new ZooKeeperStateHandleStore<>(client, storageHelperMock, Executors.directExecutor()));
 		whenNew(ZooKeeperStateHandleStore.class).withAnyArguments().thenReturn(zooKeeperStateHandleStoreMock);
-		doReturn(checkpointsInZooKeeper).when(zooKeeperStateHandleStoreMock).getAllSortedByName();
+		doReturn(checkpointsInZooKeeper).when(zooKeeperStateHandleStoreMock).getAllSortedByNameAndLock();
 
 		final int numCheckpointsToRetain = 1;
 
@@ -126,7 +126,6 @@ public class ZooKeeperCompletedCheckpointStoreTest extends TestLogger {
 		when(
 			client
 				.delete()
-				.deletingChildrenIfNeeded()
 				.inBackground(any(BackgroundCallback.class), any(Executor.class))
 		).thenAnswer(new Answer<ErrorListenerPathable<Void>>() {
 			@Override
@@ -150,13 +149,13 @@ public class ZooKeeperCompletedCheckpointStoreTest extends TestLogger {
 		});
 
 		final String checkpointsPath = "foobar";
-		final RetrievableStateStorageHelper<CompletedCheckpoint> stateSotrage = mock(RetrievableStateStorageHelper.class);
+		final RetrievableStateStorageHelper<CompletedCheckpoint> stateStorage = mock(RetrievableStateStorageHelper.class);
 
 		ZooKeeperCompletedCheckpointStore zooKeeperCompletedCheckpointStore = new ZooKeeperCompletedCheckpointStore(
 			numCheckpointsToRetain,
 			client,
 			checkpointsPath,
-			stateSotrage,
+			stateStorage,
 			Executors.directExecutor());
 
 		zooKeeperCompletedCheckpointStore.recover();
@@ -209,9 +208,9 @@ public class ZooKeeperCompletedCheckpointStoreTest extends TestLogger {
 				
 				return retrievableStateHandle;
 			}
-		}).when(zookeeperStateHandleStoreMock).add(anyString(), any(CompletedCheckpoint.class));
+		}).when(zookeeperStateHandleStoreMock).addAndLock(anyString(), any(CompletedCheckpoint.class));
 		
-		doThrow(new Exception()).when(zookeeperStateHandleStoreMock).remove(anyString(), any(BackgroundCallback.class));
+		doThrow(new Exception()).when(zookeeperStateHandleStoreMock).releaseAndTryRemove(anyString(), any(ZooKeeperStateHandleStore.RemoveCallback.class));
 		
 		final int numCheckpointsToRetain = 1;
 		final String checkpointsPath = "foobar";

http://git-wip-us.apache.org/repos/asf/flink/blob/3d119e11/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreITCase.java
deleted file mode 100644
index 4dc4c6b..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreITCase.java
+++ /dev/null
@@ -1,642 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.zookeeper;
-
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.api.BackgroundCallback;
-import org.apache.curator.framework.api.CuratorEvent;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.runtime.concurrent.Executors;
-import org.apache.flink.runtime.state.RetrievableStateHandle;
-import org.apache.flink.util.InstantiationUtil;
-import org.apache.flink.util.TestLogger;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.data.Stat;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.Test;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.CountDownLatch;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-/**
- * Tests for basic {@link ZooKeeperStateHandleStore} behaviour.
- *
- * <p> Tests include:
- * <ul>
- * <li>Expected usage of operations</li>
- * <li>Correct ordering of ZooKeeper and state handle operations</li>
- * </ul>
- */
-public class ZooKeeperStateHandleStoreITCase extends TestLogger {
-
-	private final static ZooKeeperTestEnvironment ZooKeeper = new ZooKeeperTestEnvironment(1);
-
-	@AfterClass
-	public static void tearDown() throws Exception {
-		if (ZooKeeper != null) {
-			ZooKeeper.shutdown();
-		}
-	}
-
-	@Before
-	public void cleanUp() throws Exception {
-		ZooKeeper.deleteAll();
-	}
-
-	/**
-	 * Tests add operation with default {@link CreateMode}.
-	 */
-	@Test
-	public void testAdd() throws Exception {
-		LongStateStorage longStateStorage = new LongStateStorage();
-		ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<Long>(
-				ZooKeeper.getClient(), longStateStorage, Executors.directExecutor());
-
-		// Config
-		final String pathInZooKeeper = "/testAdd";
-		final Long state = 1239712317L;
-
-		// Test
-		store.add(pathInZooKeeper, state);
-
-		// Verify
-		// State handle created
-		assertEquals(1, store.getAll().size());
-		assertEquals(state, store.get(pathInZooKeeper).retrieveState());
-
-		// Path created and is persistent
-		Stat stat = ZooKeeper.getClient().checkExists().forPath(pathInZooKeeper);
-		assertNotNull(stat);
-		assertEquals(0, stat.getEphemeralOwner());
-
-		// Data is equal
-		@SuppressWarnings("unchecked")
-		Long actual = ((RetrievableStateHandle<Long>) InstantiationUtil.deserializeObject(
-				ZooKeeper.getClient().getData().forPath(pathInZooKeeper),
-				ClassLoader.getSystemClassLoader())).retrieveState();
-
-		assertEquals(state, actual);
-	}
-
-	/**
-	 * Tests that {@link CreateMode} is respected.
-	 */
-	@Test
-	public void testAddWithCreateMode() throws Exception {
-		LongStateStorage longStateStorage = new LongStateStorage();
-		ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<Long>(
-				ZooKeeper.getClient(), longStateStorage, Executors.directExecutor());
-
-		// Config
-		Long state = 3457347234L;
-
-		CreateMode[] modes = CreateMode.values();
-		for (int i = 0; i < modes.length; i++) {
-			CreateMode mode = modes[i];
-			state += i;
-
-			String pathInZooKeeper = "/testAddWithCreateMode" + mode.name();
-
-			// Test
-			store.add(pathInZooKeeper, state, mode);
-
-			if (mode.isSequential()) {
-				// Figure out the sequential ID
-				List<String> paths = ZooKeeper.getClient().getChildren().forPath("/");
-				for (String p : paths) {
-					if (p.startsWith("testAddWithCreateMode" + mode.name())) {
-						pathInZooKeeper = "/" + p;
-						break;
-					}
-				}
-			}
-
-			// Verify
-			// State handle created
-			assertEquals(i + 1, store.getAll().size());
-			assertEquals(state, longStateStorage.getStateHandles().get(i).retrieveState());
-
-			// Path created
-			Stat stat = ZooKeeper.getClient().checkExists().forPath(pathInZooKeeper);
-
-			assertNotNull(stat);
-
-			// Is ephemeral or persistent
-			if (mode.isEphemeral()) {
-				assertTrue(stat.getEphemeralOwner() != 0);
-			}
-			else {
-				assertEquals(0, stat.getEphemeralOwner());
-			}
-
-			// Data is equal
-			@SuppressWarnings("unchecked")
-			Long actual = ((RetrievableStateHandle<Long>) InstantiationUtil.deserializeObject(
-					ZooKeeper.getClient().getData().forPath(pathInZooKeeper),
-					ClassLoader.getSystemClassLoader())).retrieveState();
-
-			assertEquals(state, actual);
-		}
-	}
-
-	/**
-	 * Tests that an existing path throws an Exception.
-	 */
-	@Test(expected = Exception.class)
-	public void testAddAlreadyExistingPath() throws Exception {
-		LongStateStorage stateHandleProvider = new LongStateStorage();
-
-		ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
-				ZooKeeper.getClient(), stateHandleProvider, Executors.directExecutor());
-
-		ZooKeeper.getClient().create().forPath("/testAddAlreadyExistingPath");
-
-		store.add("/testAddAlreadyExistingPath", 1L);
-	}
-
-	/**
-	 * Tests that the created state handle is discarded if ZooKeeper create fails.
-	 */
-	@Test
-	public void testAddDiscardStateHandleAfterFailure() throws Exception {
-		// Setup
-		LongStateStorage stateHandleProvider = new LongStateStorage();
-
-		CuratorFramework client = spy(ZooKeeper.getClient());
-		when(client.create()).thenThrow(new RuntimeException("Expected test Exception."));
-
-		ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
-				client, stateHandleProvider, Executors.directExecutor());
-
-		// Config
-		final String pathInZooKeeper = "/testAddDiscardStateHandleAfterFailure";
-		final Long state = 81282227L;
-
-		try {
-			// Test
-			store.add(pathInZooKeeper, state);
-			fail("Did not throw expected exception");
-		}
-		catch (Exception ignored) {
-		}
-
-		// Verify
-		// State handle created and discarded
-		assertEquals(1, stateHandleProvider.getStateHandles().size());
-		assertEquals(state, stateHandleProvider.getStateHandles().get(0).retrieveState());
-		assertEquals(1, stateHandleProvider.getStateHandles().get(0).getNumberOfDiscardCalls());
-	}
-
-	/**
-	 * Tests that a state handle is replaced.
-	 */
-	@Test
-	public void testReplace() throws Exception {
-		// Setup
-		LongStateStorage stateHandleProvider = new LongStateStorage();
-
-		ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
-				ZooKeeper.getClient(), stateHandleProvider, Executors.directExecutor());
-
-		// Config
-		final String pathInZooKeeper = "/testReplace";
-		final Long initialState = 30968470898L;
-		final Long replaceState = 88383776661L;
-
-		// Test
-		store.add(pathInZooKeeper, initialState);
-		store.replace(pathInZooKeeper, 0, replaceState);
-
-		// Verify
-		// State handles created
-		assertEquals(2, stateHandleProvider.getStateHandles().size());
-		assertEquals(initialState, stateHandleProvider.getStateHandles().get(0).retrieveState());
-		assertEquals(replaceState, stateHandleProvider.getStateHandles().get(1).retrieveState());
-
-		// Path created and is persistent
-		Stat stat = ZooKeeper.getClient().checkExists().forPath(pathInZooKeeper);
-		assertNotNull(stat);
-		assertEquals(0, stat.getEphemeralOwner());
-
-		// Data is equal
-		@SuppressWarnings("unchecked")
-		Long actual = ((RetrievableStateHandle<Long>) InstantiationUtil.deserializeObject(
-				ZooKeeper.getClient().getData().forPath(pathInZooKeeper),
-				ClassLoader.getSystemClassLoader())).retrieveState();
-
-		assertEquals(replaceState, actual);
-	}
-
-	/**
-	 * Tests that a non existing path throws an Exception.
-	 */
-	@Test(expected = Exception.class)
-	public void testReplaceNonExistingPath() throws Exception {
-		RetrievableStateStorageHelper<Long> stateStorage = new LongStateStorage();
-
-		ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
-				ZooKeeper.getClient(), stateStorage, Executors.directExecutor());
-
-		store.replace("/testReplaceNonExistingPath", 0, 1L);
-	}
-
-	/**
-	 * Tests that the replace state handle is discarded if ZooKeeper setData fails.
-	 */
-	@Test
-	public void testReplaceDiscardStateHandleAfterFailure() throws Exception {
-		// Setup
-		LongStateStorage stateHandleProvider = new LongStateStorage();
-
-		CuratorFramework client = spy(ZooKeeper.getClient());
-		when(client.setData()).thenThrow(new RuntimeException("Expected test Exception."));
-
-		ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
-				client, stateHandleProvider, Executors.directExecutor());
-
-		// Config
-		final String pathInZooKeeper = "/testReplaceDiscardStateHandleAfterFailure";
-		final Long initialState = 30968470898L;
-		final Long replaceState = 88383776661L;
-
-		// Test
-		store.add(pathInZooKeeper, initialState);
-
-		try {
-			store.replace(pathInZooKeeper, 0, replaceState);
-			fail("Did not throw expected exception");
-		}
-		catch (Exception ignored) {
-		}
-
-		// Verify
-		// State handle created and discarded
-		assertEquals(2, stateHandleProvider.getStateHandles().size());
-		assertEquals(initialState, stateHandleProvider.getStateHandles().get(0).retrieveState());
-		assertEquals(replaceState, stateHandleProvider.getStateHandles().get(1).retrieveState());
-		assertEquals(1, stateHandleProvider.getStateHandles().get(1).getNumberOfDiscardCalls());
-
-		// Initial value
-		@SuppressWarnings("unchecked")
-		Long actual = ((RetrievableStateHandle<Long>) InstantiationUtil.deserializeObject(
-				ZooKeeper.getClient().getData().forPath(pathInZooKeeper),
-				ClassLoader.getSystemClassLoader())).retrieveState();
-
-		assertEquals(initialState, actual);
-	}
-
-	/**
-	 * Tests get operation.
-	 */
-	@Test
-	public void testGetAndExists() throws Exception {
-		// Setup
-		LongStateStorage stateHandleProvider = new LongStateStorage();
-
-		ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
-				ZooKeeper.getClient(), stateHandleProvider, Executors.directExecutor());
-
-		// Config
-		final String pathInZooKeeper = "/testGetAndExists";
-		final Long state = 311222268470898L;
-
-		// Test
-		assertEquals(-1, store.exists(pathInZooKeeper));
-
-		store.add(pathInZooKeeper, state);
-		RetrievableStateHandle<Long> actual = store.get(pathInZooKeeper);
-
-		// Verify
-		assertEquals(state, actual.retrieveState());
-		assertTrue(store.exists(pathInZooKeeper) >= 0);
-	}
-
-	/**
-	 * Tests that a non existing path throws an Exception.
-	 */
-	@Test(expected = Exception.class)
-	public void testGetNonExistingPath() throws Exception {
-		LongStateStorage stateHandleProvider = new LongStateStorage();
-
-		ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
-				ZooKeeper.getClient(), stateHandleProvider, Executors.directExecutor());
-
-		store.get("/testGetNonExistingPath");
-	}
-
-	/**
-	 * Tests that all added state is returned.
-	 */
-	@Test
-	public void testGetAll() throws Exception {
-		// Setup
-		LongStateStorage stateHandleProvider = new LongStateStorage();
-
-		ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
-				ZooKeeper.getClient(), stateHandleProvider, Executors.directExecutor());
-
-		// Config
-		final String pathInZooKeeper = "/testGetAll";
-
-		final Set<Long> expected = new HashSet<>();
-		expected.add(311222268470898L);
-		expected.add(132812888L);
-		expected.add(27255442L);
-		expected.add(11122233124L);
-
-		// Test
-		for (long val : expected) {
-			store.add(pathInZooKeeper, val, CreateMode.PERSISTENT_SEQUENTIAL);
-		}
-
-		for (Tuple2<RetrievableStateHandle<Long>, String> val : store.getAll()) {
-			assertTrue(expected.remove(val.f0.retrieveState()));
-		}
-		assertEquals(0, expected.size());
-	}
-
-	/**
-	 * Tests that the state is returned sorted.
-	 */
-	@Test
-	public void testGetAllSortedByName() throws Exception {
-		// Setup
-		LongStateStorage stateHandleProvider = new LongStateStorage();
-
-		ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
-				ZooKeeper.getClient(), stateHandleProvider, Executors.directExecutor());
-
-		// Config
-		final String pathInZooKeeper = "/testGetAllSortedByName";
-
-		final Long[] expected = new Long[] {
-				311222268470898L, 132812888L, 27255442L, 11122233124L };
-
-		// Test
-		for (long val : expected) {
-			store.add(pathInZooKeeper, val, CreateMode.PERSISTENT_SEQUENTIAL);
-		}
-
-		List<Tuple2<RetrievableStateHandle<Long>, String>> actual = store.getAllSortedByName();
-		assertEquals(expected.length, actual.size());
-
-		for (int i = 0; i < expected.length; i++) {
-			assertEquals(expected[i], actual.get(i).f0.retrieveState());
-		}
-	}
-
-	/**
-	 * Tests that state handles are correctly removed.
-	 */
-	@Test
-	public void testRemove() throws Exception {
-		// Setup
-		LongStateStorage stateHandleProvider = new LongStateStorage();
-
-		ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
-				ZooKeeper.getClient(), stateHandleProvider, Executors.directExecutor());
-
-		// Config
-		final String pathInZooKeeper = "/testRemove";
-		final Long state = 27255442L;
-
-		store.add(pathInZooKeeper, state);
-
-		// Test
-		store.remove(pathInZooKeeper);
-
-		// Verify discarded
-		assertEquals(0, ZooKeeper.getClient().getChildren().forPath("/").size());
-	}
-
-	/**
-	 * Tests that state handles are correctly removed with a callback.
-	 */
-	@Test
-	public void testRemoveWithCallback() throws Exception {
-		// Setup
-		LongStateStorage stateHandleProvider = new LongStateStorage();
-
-		ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
-				ZooKeeper.getClient(), stateHandleProvider, Executors.directExecutor());
-
-		// Config
-		final String pathInZooKeeper = "/testRemoveWithCallback";
-		final Long state = 27255442L;
-
-		store.add(pathInZooKeeper, state);
-
-		final CountDownLatch sync = new CountDownLatch(1);
-		BackgroundCallback callback = mock(BackgroundCallback.class);
-		doAnswer(new Answer<Void>() {
-			@Override
-			public Void answer(InvocationOnMock invocation) throws Throwable {
-				sync.countDown();
-				return null;
-			}
-		}).when(callback).processResult(eq(ZooKeeper.getClient()), any(CuratorEvent.class));
-
-		// Test
-		store.remove(pathInZooKeeper, callback);
-
-		// Verify discarded and callback called
-		assertEquals(0, ZooKeeper.getClient().getChildren().forPath("/").size());
-
-		sync.await();
-
-		verify(callback, times(1))
-				.processResult(eq(ZooKeeper.getClient()), any(CuratorEvent.class));
-	}
-
-	/**
-	 * Tests that state handles are correctly discarded.
-	 */
-	@Test
-	public void testRemoveAndDiscardState() throws Exception {
-		// Setup
-		LongStateStorage stateHandleProvider = new LongStateStorage();
-
-		ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
-				ZooKeeper.getClient(), stateHandleProvider, Executors.directExecutor());
-
-		// Config
-		final String pathInZooKeeper = "/testDiscard";
-		final Long state = 27255442L;
-
-		store.add(pathInZooKeeper, state);
-
-		// Test
-		store.removeAndDiscardState(pathInZooKeeper);
-
-		// Verify discarded
-		assertEquals(0, ZooKeeper.getClient().getChildren().forPath("/").size());
-	}
-
-	/** Tests that all state handles are correctly discarded. */
-	@Test
-	public void testRemoveAndDiscardAllState() throws Exception {
-		// Setup
-		LongStateStorage stateHandleProvider = new LongStateStorage();
-
-		ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
-				ZooKeeper.getClient(), stateHandleProvider, Executors.directExecutor());
-
-		// Config
-		final String pathInZooKeeper = "/testDiscardAll";
-
-		final Set<Long> expected = new HashSet<>();
-		expected.add(311222268470898L);
-		expected.add(132812888L);
-		expected.add(27255442L);
-		expected.add(11122233124L);
-
-		// Test
-		for (long val : expected) {
-			store.add(pathInZooKeeper, val, CreateMode.PERSISTENT_SEQUENTIAL);
-		}
-
-		store.removeAndDiscardAllState();
-
-		// Verify all discarded
-		assertEquals(0, ZooKeeper.getClient().getChildren().forPath("/").size());
-	}
-
-	/**
-	 * Tests that the ZooKeeperStateHandleStore can handle corrupted data by ignoring the respective
-	 * ZooKeeper ZNodes.
-	 */
-	@Test
-	public void testCorruptedData() throws Exception {
-		LongStateStorage stateStorage = new LongStateStorage();
-
-		ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
-			ZooKeeper.getClient(),
-			stateStorage,
-			Executors.directExecutor());
-
-		final Collection<Long> input = new HashSet<>();
-		input.add(1L);
-		input.add(2L);
-		input.add(3L);
-
-		for (Long aLong : input) {
-			store.add("/" + aLong, aLong);
-		}
-
-		// corrupt one of the entries
-		ZooKeeper.getClient().setData().forPath("/" + 2, new byte[2]);
-
-		List<Tuple2<RetrievableStateHandle<Long>, String>> allEntries = store.getAll();
-
-		Collection<Long> expected = new HashSet<>(input);
-		expected.remove(2L);
-
-		Collection<Long> actual = new HashSet<>(expected.size());
-
-		for (Tuple2<RetrievableStateHandle<Long>, String> entry : allEntries) {
-			actual.add(entry.f0.retrieveState());
-		}
-
-		assertEquals(expected, actual);
-
-		// check the same for the all sorted by name call
-		allEntries = store.getAllSortedByName();
-
-		actual.clear();
-
-		for (Tuple2<RetrievableStateHandle<Long>, String> entry : allEntries) {
-			actual.add(entry.f0.retrieveState());
-		}
-
-		assertEquals(expected, actual);
-	}
-
-	// ---------------------------------------------------------------------------------------------
-	// Simple test helpers
-	// ---------------------------------------------------------------------------------------------
-
-	private static class LongStateStorage implements RetrievableStateStorageHelper<Long> {
-
-		private final List<LongRetrievableStateHandle> stateHandles = new ArrayList<>();
-
-		@Override
-		public RetrievableStateHandle<Long> store(Long state) throws Exception {
-			LongRetrievableStateHandle stateHandle = new LongRetrievableStateHandle(state);
-			stateHandles.add(stateHandle);
-
-			return stateHandle;
-		}
-
-		List<LongRetrievableStateHandle> getStateHandles() {
-			return stateHandles;
-		}
-	}
-
-	private static class LongRetrievableStateHandle implements RetrievableStateHandle<Long> {
-
-		private static final long serialVersionUID = -3555329254423838912L;
-
-		private final Long state;
-
-		private int numberOfDiscardCalls;
-
-		public LongRetrievableStateHandle(Long state) {
-			this.state = state;
-		}
-
-		@Override
-		public Long retrieveState() throws Exception {
-			return state;
-		}
-
-		@Override
-		public void discardState() throws Exception {
-			numberOfDiscardCalls++;
-		}
-
-		@Override
-		public long getStateSize() {
-			return 0;
-		}
-
-		public int getNumberOfDiscardCalls() {
-			return numberOfDiscardCalls;
-		}
-	}
-}