You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/09/12 14:43:23 UTC

[flink] branch master updated (c87b943 -> a933d87)

This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from c87b943  [FLINK-10281] [table] Fix string literal escaping throughout Table & SQL API
     new 4cd3311  [hotfix] Fix checkstyle violations in ZooKeeperStateHandleStore
     new 0a345eb  [hotfix] Fix checkstyle violations in ZooKeeperCompletedCheckpointStore
     new a0b6685  [FLINK-10185] Make ZooKeeperStateHandleStore#releaseAndTryRemove synchronous
     new 7ee79d9  [hotfix] Fix checkstyle violations in ZooKeeperUtils
     new cb69af1  [FLINK-10011] Introduce SubmittedJobGraphStore#releaseJobGraph
     new c395e89  [FLINK-10011] Release JobGraph after losing leadership in JobManager
     new 05b11f9  [hotfix][tests] Ensure that JobManagerRunners are stopped when Dispatcher loses leadership
     new a933d87  [FLINK-10011] Release JobGraph from SubmittedJobGraphStore in Dispatcher

The 8 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../services/ZooKeeperMesosServices.java           |   3 +-
 .../org/apache/flink/runtime/akka/ActorUtils.java  |  10 +
 .../ZooKeeperCompletedCheckpointStore.java         | 103 +++-----
 .../flink/runtime/dispatcher/Dispatcher.java       |  59 +++--
 .../SingleJobSubmittedJobGraphStore.java           |   9 +-
 .../zookeeper/ZooKeeperHaServices.java             |   2 +-
 .../StandaloneSubmittedJobGraphStore.java          |  13 +-
 .../runtime/jobmanager/SubmittedJobGraphStore.java |  12 +
 .../ZooKeeperSubmittedJobGraphStore.java           |  31 ++-
 .../apache/flink/runtime/util/ZooKeeperUtils.java  |  17 +-
 .../zookeeper/ZooKeeperStateHandleStore.java       | 172 ++----------
 .../runtime/zookeeper/ZooKeeperUtilityFactory.java |  14 +-
 .../flink/runtime/jobmanager/JobManager.scala      |  39 +--
 .../checkpoint/CompletedCheckpointStoreTest.java   |   9 +-
 ...KeeperCompletedCheckpointStoreMockitoTest.java} |  19 +-
 .../ZooKeeperCompletedCheckpointStoreTest.java     | 288 +++++++--------------
 .../flink/runtime/dispatcher/DispatcherHATest.java |  89 ++++++-
 .../dispatcher/NoOpSubmittedJobGraphListener.java} |  24 +-
 .../runtime/dispatcher/TestingDispatcher.java      |  13 +
 .../dispatcher/ZooKeeperHADispatcherTest.java      | 203 +++++++++++++++
 .../flink/runtime/jobmanager/JobManagerTest.java   |   3 +-
 .../jobmanager/ZooKeeperHAJobManagerTest.java      | 180 +++++++++++++
 .../ZooKeeperSubmittedJobGraphsStoreITCase.java    |  14 +-
 .../testutils/InMemorySubmittedJobGraphStore.java  |   5 +
 .../ZooKeeperResource.java}                        |  57 ++--
 .../zookeeper/ZooKeeperStateHandleStoreTest.java   | 115 +++-----
 .../testingUtils/TestingJobManagerLike.scala       |   8 +
 .../testingUtils/TestingJobManagerMessages.scala   |   3 +
 28 files changed, 881 insertions(+), 633 deletions(-)
 copy flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/{ZooKeeperCompletedCheckpointStoreTest.java => ZooKeeperCompletedCheckpointStoreMockitoTest.java} (95%)
 copy flink-runtime/src/{main/java/org/apache/flink/runtime/dispatcher/DispatcherException.java => test/java/org/apache/flink/runtime/dispatcher/NoOpSubmittedJobGraphListener.java} (63%)
 create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/ZooKeeperHADispatcherTest.java
 create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperHAJobManagerTest.java
 copy flink-runtime/src/test/java/org/apache/flink/runtime/{util/BlobServerResource.java => zookeeper/ZooKeeperResource.java} (50%)


[flink] 07/08: [hotfix][tests] Ensure that JobManagerRunners are stopped when Dispatcher loses leadership

Posted by tr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 05b11f9e74ca4c7e562d92eeff69fab8b88c6ee8
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Fri Aug 17 15:24:45 2018 +0200

    [hotfix][tests] Ensure that JobManagerRunners are stopped when Dispatcher loses leadership
---
 .../flink/runtime/dispatcher/DispatcherHATest.java | 75 ++++++++++++++++++----
 1 file changed, 64 insertions(+), 11 deletions(-)

diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherHATest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherHATest.java
index adf7618..cb26f48 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherHATest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherHATest.java
@@ -30,9 +30,11 @@ import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobmanager.StandaloneSubmittedJobGraphStore;
 import org.apache.flink.runtime.jobmanager.SubmittedJobGraph;
 import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore;
 import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
+import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
 import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
@@ -111,8 +113,6 @@ public class DispatcherHATest extends TestLogger {
 	 */
 	@Test
 	public void testGrantingRevokingLeadership() throws Exception {
-
-		final Configuration configuration = new Configuration();
 		final TestingHighAvailabilityServices highAvailabilityServices = new TestingHighAvailabilityServices();
 		final JobGraph nonEmptyJobGraph = createNonEmptyJobGraph();
 		final SubmittedJobGraph submittedJobGraph = new SubmittedJobGraph(nonEmptyJobGraph, null);
@@ -125,7 +125,34 @@ public class DispatcherHATest extends TestLogger {
 
 		final BlockingQueue<DispatcherId> fencingTokens = new ArrayBlockingQueue<>(2);
 
-		final HATestingDispatcher dispatcher = new HATestingDispatcher(
+		final HATestingDispatcher dispatcher = createHADispatcher(highAvailabilityServices, fencingTokens);
+
+		dispatcher.start();
+
+		try {
+			final UUID leaderId = UUID.randomUUID();
+			dispatcherLeaderElectionService.isLeader(leaderId);
+
+			dispatcherLeaderElectionService.notLeader();
+
+			final DispatcherId firstFencingToken = fencingTokens.take();
+
+			assertThat(firstFencingToken, equalTo(NULL_FENCING_TOKEN));
+
+			enterGetJobIdsLatch.await();
+			proceedGetJobIdsLatch.trigger();
+
+			assertThat(dispatcher.getNumberJobs(timeout).get(), is(0));
+
+		} finally {
+			RpcUtils.terminateRpcEndpoint(dispatcher, timeout);
+		}
+	}
+
+	@Nonnull
+	private HATestingDispatcher createHADispatcher(TestingHighAvailabilityServices highAvailabilityServices, BlockingQueue<DispatcherId> fencingTokens) throws Exception {
+		final Configuration configuration = new Configuration();
+		return new HATestingDispatcher(
 			rpcService,
 			UUID.randomUUID().toString(),
 			configuration,
@@ -139,24 +166,50 @@ public class DispatcherHATest extends TestLogger {
 			new TestingJobManagerRunnerFactory(new CompletableFuture<>(), new CompletableFuture<>()),
 			testingFatalErrorHandler,
 			fencingTokens);
+	}
+
+	/**
+	 * Tests that all JobManagerRunner are terminated if the leadership of the
+	 * Dispatcher is revoked.
+	 */
+	@Test
+	public void testRevokeLeadershipTerminatesJobManagerRunners() throws Exception {
+
+		final TestingHighAvailabilityServices highAvailabilityServices = new TestingHighAvailabilityServices();
+		highAvailabilityServices.setSubmittedJobGraphStore(new StandaloneSubmittedJobGraphStore());
+
+		final TestingLeaderElectionService leaderElectionService = new TestingLeaderElectionService();
+		highAvailabilityServices.setDispatcherLeaderElectionService(leaderElectionService);
+
+		final ArrayBlockingQueue<DispatcherId> fencingTokens = new ArrayBlockingQueue<>(2);
+		final HATestingDispatcher dispatcher = createHADispatcher(
+			highAvailabilityServices,
+			fencingTokens);
 
 		dispatcher.start();
 
 		try {
-			final UUID leaderId = UUID.randomUUID();
-			dispatcherLeaderElectionService.isLeader(leaderId);
+			// grant leadership and submit a single job
+			final DispatcherId expectedDispatcherId = DispatcherId.generate();
 
-			dispatcherLeaderElectionService.notLeader();
+			leaderElectionService.isLeader(expectedDispatcherId.toUUID()).get();
 
-			final DispatcherId firstFencingToken = fencingTokens.take();
+			assertThat(fencingTokens.take(), is(equalTo(expectedDispatcherId)));
 
-			assertThat(firstFencingToken, equalTo(NULL_FENCING_TOKEN));
+			final DispatcherGateway dispatcherGateway = dispatcher.getSelfGateway(DispatcherGateway.class);
 
-			enterGetJobIdsLatch.await();
-			proceedGetJobIdsLatch.trigger();
+			final CompletableFuture<Acknowledge> submissionFuture = dispatcherGateway.submitJob(createNonEmptyJobGraph(), timeout);
 
-			assertThat(dispatcher.getNumberJobs(timeout).get(), is(0));
+			submissionFuture.get();
+
+			assertThat(dispatcher.getNumberJobs(timeout).get(), is(1));
 
+			// revoke the leadership --> this should stop all running JobManagerRunners
+			leaderElectionService.notLeader();
+
+			assertThat(fencingTokens.take(), is(equalTo(NULL_FENCING_TOKEN)));
+
+			assertThat(dispatcher.getNumberJobs(timeout).get(), is(0));
 		} finally {
 			RpcUtils.terminateRpcEndpoint(dispatcher, timeout);
 		}


[flink] 01/08: [hotfix] Fix checkstyle violations in ZooKeeperStateHandleStore

Posted by tr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 4cd3311421e88fa677bae8b946f40b897e1ed2e7
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Mon Aug 20 09:46:31 2018 +0200

    [hotfix] Fix checkstyle violations in ZooKeeperStateHandleStore
---
 .../zookeeper/ZooKeeperStateHandleStore.java       | 24 ++++++++++++----------
 1 file changed, 13 insertions(+), 11 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
index 87a433a..e151a11 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
@@ -18,17 +18,18 @@
 
 package org.apache.flink.runtime.zookeeper;
 
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.api.BackgroundCallback;
-import org.apache.curator.framework.api.CuratorEvent;
-import org.apache.curator.framework.api.CuratorEventType;
-import org.apache.curator.utils.ZKPaths;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.state.RetrievableStateHandle;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.Preconditions;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.api.BackgroundCallback;
+import org.apache.curator.framework.api.CuratorEvent;
+import org.apache.curator.framework.api.CuratorEventType;
+import org.apache.curator.utils.ZKPaths;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.data.Stat;
@@ -36,6 +37,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
+
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
@@ -68,13 +70,13 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  * State handle in ZooKeeper =&gt; State handle exists
  * </pre>
  *
- * But not:
+ * <p>But not:
  *
  * <pre>
  * State handle exists =&gt; State handle in ZooKeeper
  * </pre>
  *
- * There can be lingering state handles when failures happen during operation. They
+ * <p>There can be lingering state handles when failures happen during operation. They
  * need to be cleaned up manually (see <a href="https://issues.apache.org/jira/browse/FLINK-2513">
  * FLINK-2513</a> about a possible way to overcome this).
  *
@@ -84,7 +86,7 @@ public class ZooKeeperStateHandleStore<T extends Serializable> {
 
 	private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperStateHandleStore.class);
 
-	/** Curator ZooKeeper client */
+	/** Curator ZooKeeper client. */
 	private final CuratorFramework client;
 
 	private final RetrievableStateStorageHelper<T> storage;
@@ -262,7 +264,7 @@ public class ZooKeeperStateHandleStore<T extends Serializable> {
 	public Collection<String> getAllPaths() throws Exception {
 		final String path = "/";
 
-		while(true) {
+		while (true) {
 			Stat stat = client.checkExists().forPath(path);
 
 			if (stat == null) {
@@ -583,7 +585,7 @@ public class ZooKeeperStateHandleStore<T extends Serializable> {
 	}
 
 	/**
-	 * Makes sure that every path starts with a "/"
+	 * Makes sure that every path starts with a "/".
 	 *
 	 * @param path Path to normalize
 	 * @return Normalized path such that it starts with a "/"
@@ -682,7 +684,7 @@ public class ZooKeeperStateHandleStore<T extends Serializable> {
 	}
 
 	/**
-	 * Callback interface for remove calls
+	 * Callback interface for remove calls.
 	 */
 	public interface RemoveCallback<T extends Serializable> {
 		/**


[flink] 04/08: [hotfix] Fix checkstyle violations in ZooKeeperUtils

Posted by tr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 7ee79d930a44ca216e42ce2438cd62ced1c5be67
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Fri Aug 17 16:28:02 2018 +0200

    [hotfix] Fix checkstyle violations in ZooKeeperUtils
---
 .../main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java  | 9 +++++++++
 1 file changed, 9 insertions(+)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
index d9c9161..cc1ec70 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
@@ -54,6 +54,9 @@ import java.util.concurrent.Executor;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
+/**
+ * Class containing helper functions to interact with ZooKeeper.
+ */
 public class ZooKeeperUtils {
 
 	private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperUtils.class);
@@ -344,6 +347,9 @@ public class ZooKeeperUtils {
 		return root + namespace;
 	}
 
+	/**
+	 * Secure {@link ACLProvider} implementation.
+	 */
 	public static class SecureAclProvider implements ACLProvider {
 		@Override
 		public List<ACL> getDefaultAcl() {
@@ -356,6 +362,9 @@ public class ZooKeeperUtils {
 		}
 	}
 
+	/**
+	 * ZooKeeper client ACL mode enum.
+	 */
 	public enum ZkClientACLMode {
 		CREATOR,
 		OPEN;


[flink] 03/08: [FLINK-10185] Make ZooKeeperStateHandleStore#releaseAndTryRemove synchronous

Posted by tr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit a0b668555f5ee407d3cdbc42113cc28f019fc318
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Mon Aug 20 11:19:18 2018 +0200

    [FLINK-10185] Make ZooKeeperStateHandleStore#releaseAndTryRemove synchronous
    
    Remove the asynchronous callback from ZooKeeperStateHandleStore#releaseAndTryRemove.
    Instead we can execute the callback after having executed the releaseAndTryRemove
    method successfully. This separates concerns better because we don't mix storage
    with business logic. Furthermore, we can still avoid blocking operations if we use a
    separate thread to call into ZooKeeperStateHandleStore#releaseAndTryRemove.
    
    This closes #6586.
---
 .../services/ZooKeeperMesosServices.java           |   3 +-
 .../ZooKeeperCompletedCheckpointStore.java         | 101 +++-----
 .../zookeeper/ZooKeeperHaServices.java             |   2 +-
 .../ZooKeeperSubmittedJobGraphStore.java           |   7 +-
 .../apache/flink/runtime/util/ZooKeeperUtils.java  |   8 +-
 .../zookeeper/ZooKeeperStateHandleStore.java       | 156 ++---------
 .../runtime/zookeeper/ZooKeeperUtilityFactory.java |  14 +-
 .../checkpoint/CompletedCheckpointStoreTest.java   |   9 +-
 ...KeeperCompletedCheckpointStoreMockitoTest.java} |  19 +-
 .../ZooKeeperCompletedCheckpointStoreTest.java     | 288 +++++++--------------
 .../ZooKeeperSubmittedJobGraphsStoreITCase.java    |  14 +-
 .../flink/runtime/zookeeper/ZooKeeperResource.java |  72 ++++++
 .../zookeeper/ZooKeeperStateHandleStoreTest.java   | 115 +++-----
 13 files changed, 283 insertions(+), 525 deletions(-)

diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/ZooKeeperMesosServices.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/ZooKeeperMesosServices.java
index 069cb83..45d1141 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/ZooKeeperMesosServices.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/ZooKeeperMesosServices.java
@@ -56,8 +56,7 @@ public class ZooKeeperMesosServices extends AbstractMesosServices {
 
 		ZooKeeperStateHandleStore<MesosWorkerStore.Worker> zooKeeperStateHandleStore = zooKeeperUtilityFactory.createZooKeeperStateHandleStore(
 			"/workers",
-			stateStorageHelper,
-			executor);
+			stateStorageHelper);
 
 		ZooKeeperSharedValue frameworkId = zooKeeperUtilityFactory.createSharedValue("/frameworkId", new byte[0]);
 		ZooKeeperSharedCount totalTaskCount = zooKeeperUtilityFactory.createSharedCount("/taskCount", 0);
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
index deb1ab3..1317339 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
@@ -25,14 +25,13 @@ import org.apache.flink.runtime.state.RetrievableStateHandle;
 import org.apache.flink.runtime.zookeeper.RetrievableStateStorageHelper;
 import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore;
 import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.function.ConsumerWithException;
 
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.utils.ZKPaths;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.annotation.Nullable;
-
 import java.io.IOException;
 import java.util.ArrayDeque;
 import java.util.ArrayList;
@@ -86,6 +85,8 @@ public class ZooKeeperCompletedCheckpointStore implements CompletedCheckpointSto
 	 */
 	private final ArrayDeque<CompletedCheckpoint> completedCheckpoints;
 
+	private final Executor executor;
+
 	/**
 	 * Creates a {@link ZooKeeperCompletedCheckpointStore} instance.
 	 *
@@ -98,7 +99,7 @@ public class ZooKeeperCompletedCheckpointStore implements CompletedCheckpointSto
 	 *                                       start with a '/')
 	 * @param stateStorage                   State storage to be used to persist the completed
 	 *                                       checkpoint
-	 * @param executor to give to the ZooKeeperStateHandleStore to run ZooKeeper callbacks
+	 * @param executor to execute blocking calls
 	 * @throws Exception
 	 */
 	public ZooKeeperCompletedCheckpointStore(
@@ -123,10 +124,12 @@ public class ZooKeeperCompletedCheckpointStore implements CompletedCheckpointSto
 		// All operations will have the path as root
 		this.client = client.usingNamespace(client.getNamespace() + checkpointsPath);
 
-		this.checkpointsInZooKeeper = new ZooKeeperStateHandleStore<>(this.client, stateStorage, executor);
+		this.checkpointsInZooKeeper = new ZooKeeperStateHandleStore<>(this.client, stateStorage);
 
 		this.completedCheckpoints = new ArrayDeque<>(maxNumberOfCheckpointsToRetain + 1);
 
+		this.executor = checkNotNull(executor);
+
 		LOG.info("Initialized in '{}'.", checkpointsPath);
 	}
 
@@ -236,16 +239,30 @@ public class ZooKeeperCompletedCheckpointStore implements CompletedCheckpointSto
 
 		// Everything worked, let's remove a previous checkpoint if necessary.
 		while (completedCheckpoints.size() > maxNumberOfCheckpointsToRetain) {
-			try {
-				removeSubsumed(completedCheckpoints.removeFirst());
-			} catch (Exception e) {
-				LOG.warn("Failed to subsume the old checkpoint", e);
-			}
+			final CompletedCheckpoint completedCheckpoint = completedCheckpoints.removeFirst();
+			tryRemoveCompletedCheckpoint(completedCheckpoint, CompletedCheckpoint::discardOnSubsume);
 		}
 
 		LOG.debug("Added {} to {}.", checkpoint, path);
 	}
 
+	private void tryRemoveCompletedCheckpoint(CompletedCheckpoint completedCheckpoint, ConsumerWithException<CompletedCheckpoint, Exception> discardCallback) {
+		try {
+			if (tryRemove(completedCheckpoint.getCheckpointID())) {
+				executor.execute(() -> {
+					try {
+						discardCallback.accept(completedCheckpoint);
+					} catch (Exception e) {
+						LOG.warn("Could not discard completed checkpoint {}.", completedCheckpoint.getCheckpointID(), e);
+					}
+				});
+
+			}
+		} catch (Exception e) {
+			LOG.warn("Failed to subsume the old checkpoint", e);
+		}
+	}
+
 	@Override
 	public CompletedCheckpoint getLatestCheckpoint() {
 		if (completedCheckpoints.isEmpty()) {
@@ -278,11 +295,9 @@ public class ZooKeeperCompletedCheckpointStore implements CompletedCheckpointSto
 			LOG.info("Shutting down");
 
 			for (CompletedCheckpoint checkpoint : completedCheckpoints) {
-				try {
-					removeShutdown(checkpoint, jobStatus);
-				} catch (Exception e) {
-					LOG.error("Failed to discard checkpoint.", e);
-				}
+				tryRemoveCompletedCheckpoint(
+					checkpoint,
+					completedCheckpoint -> completedCheckpoint.discardOnShutdown(jobStatus));
 			}
 
 			completedCheckpoints.clear();
@@ -305,59 +320,13 @@ public class ZooKeeperCompletedCheckpointStore implements CompletedCheckpointSto
 	// ------------------------------------------------------------------------
 
 	/**
-	 * Removes a subsumed checkpoint from ZooKeeper and drops the state.
-	 */
-	private void removeSubsumed(
-		final CompletedCheckpoint completedCheckpoint) throws Exception {
-
-		if (completedCheckpoint == null) {
-			return;
-		}
-
-		ZooKeeperStateHandleStore.RemoveCallback<CompletedCheckpoint> action =
-			new ZooKeeperStateHandleStore.RemoveCallback<CompletedCheckpoint>() {
-				@Override
-				public void apply(@Nullable RetrievableStateHandle<CompletedCheckpoint> value) throws FlinkException {
-					if (value != null) {
-						try {
-							completedCheckpoint.discardOnSubsume();
-						} catch (Exception e) {
-							throw new FlinkException("Could not discard the completed checkpoint on subsume.", e);
-						}
-					}
-				}
-			};
-
-		checkpointsInZooKeeper.releaseAndTryRemove(
-			checkpointIdToPath(completedCheckpoint.getCheckpointID()),
-			action);
-	}
-
-	/**
-	 * Removes a checkpoint from ZooKeeper because of Job shutdown and drops the state.
+	 * Tries to remove the checkpoint identified by the given checkpoint id.
+	 *
+	 * @param checkpointId identifying the checkpoint to remove
+	 * @return true if the checkpoint could be removed
 	 */
-	private void removeShutdown(
-			final CompletedCheckpoint completedCheckpoint,
-			final JobStatus jobStatus) throws Exception {
-
-		if (completedCheckpoint == null) {
-			return;
-		}
-
-		ZooKeeperStateHandleStore.RemoveCallback<CompletedCheckpoint> removeAction = new ZooKeeperStateHandleStore.RemoveCallback<CompletedCheckpoint>() {
-			@Override
-			public void apply(@Nullable RetrievableStateHandle<CompletedCheckpoint> value) throws FlinkException {
-				try {
-					completedCheckpoint.discardOnShutdown(jobStatus);
-				} catch (Exception e) {
-					throw new FlinkException("Could not discard the completed checkpoint on subsume.", e);
-				}
-			}
-		};
-
-		checkpointsInZooKeeper.releaseAndTryRemove(
-			checkpointIdToPath(completedCheckpoint.getCheckpointID()),
-			removeAction);
+	private boolean tryRemove(long checkpointId) throws Exception {
+		return checkpointsInZooKeeper.releaseAndTryRemove(checkpointIdToPath(checkpointId));
 	}
 
 	/**
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServices.java
index 3882479..ea96d7d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServices.java
@@ -179,7 +179,7 @@ public class ZooKeeperHaServices implements HighAvailabilityServices {
 
 	@Override
 	public SubmittedJobGraphStore getSubmittedJobGraphStore() throws Exception {
-		return ZooKeeperUtils.createSubmittedJobGraphs(client, configuration, executor);
+		return ZooKeeperUtils.createSubmittedJobGraphs(client, configuration);
 	}
 
 	@Override
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
index 7ba5d48..0510815 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
@@ -41,7 +41,6 @@ import java.util.Collection;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
-import java.util.concurrent.Executor;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.apache.flink.util.Preconditions.checkState;
@@ -99,14 +98,12 @@ public class ZooKeeperSubmittedJobGraphStore implements SubmittedJobGraphStore {
 	 * @param client ZooKeeper client
 	 * @param currentJobsPath ZooKeeper path for current job graphs
 	 * @param stateStorage State storage used to persist the submitted jobs
-	 * @param executor to give to the ZooKeeperStateHandleStore to run ZooKeeper callbacks
 	 * @throws Exception
 	 */
 	public ZooKeeperSubmittedJobGraphStore(
 			CuratorFramework client,
 			String currentJobsPath,
-			RetrievableStateStorageHelper<SubmittedJobGraph> stateStorage,
-			Executor executor) throws Exception {
+			RetrievableStateStorageHelper<SubmittedJobGraph> stateStorage) throws Exception {
 
 		checkNotNull(currentJobsPath, "Current jobs path");
 		checkNotNull(stateStorage, "State storage");
@@ -123,7 +120,7 @@ public class ZooKeeperSubmittedJobGraphStore implements SubmittedJobGraphStore {
 		CuratorFramework facade = client.usingNamespace(client.getNamespace() + currentJobsPath);
 
 		this.zooKeeperFullBasePath = client.getNamespace() + currentJobsPath;
-		this.jobGraphsInZooKeeper = new ZooKeeperStateHandleStore<>(facade, stateStorage, executor);
+		this.jobGraphsInZooKeeper = new ZooKeeperStateHandleStore<>(facade, stateStorage);
 
 		this.pathCache = new PathChildrenCache(facade, "/", false);
 		pathCache.getListenable().addListener(new SubmittedJobGraphsPathCacheListener());
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
index 43c930e..d9c9161 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
@@ -227,14 +227,12 @@ public class ZooKeeperUtils {
 	 *
 	 * @param client        The {@link CuratorFramework} ZooKeeper client to use
 	 * @param configuration {@link Configuration} object
-	 * @param executor to run ZooKeeper callbacks
 	 * @return {@link ZooKeeperSubmittedJobGraphStore} instance
 	 * @throws Exception if the submitted job graph store cannot be created
 	 */
 	public static ZooKeeperSubmittedJobGraphStore createSubmittedJobGraphs(
 			CuratorFramework client,
-			Configuration configuration,
-			Executor executor) throws Exception {
+			Configuration configuration) throws Exception {
 
 		checkNotNull(configuration, "Configuration");
 
@@ -244,7 +242,9 @@ public class ZooKeeperUtils {
 		String zooKeeperSubmittedJobsPath = configuration.getString(HighAvailabilityOptions.HA_ZOOKEEPER_JOBGRAPHS_PATH);
 
 		return new ZooKeeperSubmittedJobGraphStore(
-				client, zooKeeperSubmittedJobsPath, stateStorage, executor);
+			client,
+			zooKeeperSubmittedJobsPath,
+			stateStorage);
 	}
 
 	/**
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
index e151a11..8c3d31f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
@@ -21,14 +21,9 @@ package org.apache.flink.runtime.zookeeper;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.state.RetrievableStateHandle;
 import org.apache.flink.util.ExceptionUtils;
-import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.InstantiationUtil;
-import org.apache.flink.util.Preconditions;
 
 import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.api.BackgroundCallback;
-import org.apache.curator.framework.api.CuratorEvent;
-import org.apache.curator.framework.api.CuratorEventType;
 import org.apache.curator.utils.ZKPaths;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
@@ -46,7 +41,6 @@ import java.util.Collections;
 import java.util.ConcurrentModificationException;
 import java.util.List;
 import java.util.UUID;
-import java.util.concurrent.Executor;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -91,8 +85,6 @@ public class ZooKeeperStateHandleStore<T extends Serializable> {
 
 	private final RetrievableStateStorageHelper<T> storage;
 
-	private final Executor executor;
-
 	/** Lock node name of this ZooKeeperStateHandleStore. The name should be unique among all other state handle stores. */
 	private final String lockNode;
 
@@ -105,16 +97,13 @@ public class ZooKeeperStateHandleStore<T extends Serializable> {
 	 *                            instance, e.g. <code>client.usingNamespace("/stateHandles")</code>
 	 * @param storage to persist the actual state and whose returned state handle is then written
 	 *                to ZooKeeper
-	 * @param executor to run the ZooKeeper callbacks
 	 */
 	public ZooKeeperStateHandleStore(
 		CuratorFramework client,
-		RetrievableStateStorageHelper<T> storage,
-		Executor executor) {
+		RetrievableStateStorageHelper<T> storage) {
 
 		this.client = checkNotNull(client, "Curator client");
 		this.storage = checkNotNull(storage, "State storage");
-		this.executor = checkNotNull(executor);
 
 		// Generate a unique lock node name
 		lockNode = UUID.randomUUID().toString();
@@ -395,33 +384,14 @@ public class ZooKeeperStateHandleStore<T extends Serializable> {
 
 	/**
 	 * Releases the lock for the given state node and tries to remove the state node if it is no longer locked.
-	 * The deletion of the state node is executed asynchronously.
-	 *
-	 * <p><strong>Important</strong>: This also discards the stored state handle after the given action
-	 * has been executed.
-	 *
-	 * @param pathInZooKeeper Path of state handle to remove (expected to start with a '/')
-	 * @throws Exception If the ZooKeeper operation fails
-	 */
-	public void releaseAndTryRemove(String pathInZooKeeper) throws Exception {
-		releaseAndTryRemove(pathInZooKeeper, null);
-	}
-
-	/**
-	 * Releases the lock for the given state node and tries to remove the state node if it is no longer locked.
-	 * The deletion of the state node is executed asynchronously. After the state node has been deleted, the given
-	 * callback is called with the {@link RetrievableStateHandle} of the deleted state node.
-	 *
-	 * <p><strong>Important</strong>: This also discards the stored state handle after the given action
-	 * has been executed.
+	 * It returns the {@link RetrievableStateHandle} stored under the given state node if any.
 	 *
 	 * @param pathInZooKeeper Path of state handle to remove
-	 * @param callback The callback to execute after a successful deletion. Null if no action needs to be executed.
-	 * @throws Exception If the ZooKeeper operation fails
+	 * @return True if the state handle could be released
+	 * @throws Exception If the ZooKeeper operation or discarding the state handle fails
 	 */
-	public void releaseAndTryRemove(
-			String pathInZooKeeper,
-			@Nullable final RemoveCallback<T> callback) throws Exception {
+	@Nullable
+	public boolean releaseAndTryRemove(String pathInZooKeeper) throws Exception {
 		checkNotNull(pathInZooKeeper, "Path in ZooKeeper");
 
 		final String path = normalizePath(pathInZooKeeper);
@@ -431,14 +401,23 @@ public class ZooKeeperStateHandleStore<T extends Serializable> {
 		try {
 			stateHandle = get(path, false);
 		} catch (Exception e) {
-			LOG.warn("Could not retrieve the state handle from node " + path + '.', e);
+			LOG.warn("Could not retrieve the state handle from node {}.", path, e);
 		}
 
 		release(pathInZooKeeper);
 
-		final BackgroundCallback backgroundCallback = new RemoveBackgroundCallback<>(stateHandle, callback, path);
+		try {
+			client.delete().forPath(path);
+		} catch (KeeperException.NotEmptyException ignored) {
+			LOG.debug("Could not delete znode {} because it is still locked.", path);
+			return false;
+		}
+
+		if (stateHandle != null) {
+			stateHandle.discardState();
+		}
 
-		client.delete().inBackground(backgroundCallback, executor).forPath(path);
+		return true;
 	}
 
 	/**
@@ -597,103 +576,4 @@ public class ZooKeeperStateHandleStore<T extends Serializable> {
 			return '/' + path;
 		}
 	}
-
-	// ---------------------------------------------------------------------------------------------------------
-	// Utility classes
-	// ---------------------------------------------------------------------------------------------------------
-
-	/**
-	 * Callback which is executed when removing a node from ZooKeeper. The callback will call the given
-	 * {@link RemoveCallback} if it is not null. Afterwards, it will discard the given {@link RetrievableStateHandle}
-	 * if it is not null.
-	 *
-	 * @param <T> Type of the value stored in the RetrievableStateHandle
-	 */
-	private static final class RemoveBackgroundCallback<T extends Serializable> implements BackgroundCallback {
-		@Nullable
-		private final RetrievableStateHandle<T> stateHandle;
-
-		@Nullable
-		private final RemoveCallback<T> callback;
-
-		private final String pathInZooKeeper;
-
-		private RemoveBackgroundCallback(
-			@Nullable RetrievableStateHandle<T> stateHandle,
-			@Nullable RemoveCallback<T> callback,
-			String pathInZooKeeper) {
-
-			this.stateHandle = stateHandle;
-			this.callback = callback;
-			this.pathInZooKeeper = Preconditions.checkNotNull(pathInZooKeeper);
-		}
-
-		@Override
-		public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
-			try {
-				if (event.getType() == CuratorEventType.DELETE) {
-					final KeeperException.Code resultCode = KeeperException.Code.get(event.getResultCode());
-
-					if (resultCode == KeeperException.Code.OK) {
-						Exception exception = null;
-
-						if (null != callback) {
-							try {
-								callback.apply(stateHandle);
-							} catch (Throwable e) {
-								exception = new Exception("Could not execute delete action for node " +
-									pathInZooKeeper + '.', e);
-							}
-						}
-
-						if (stateHandle != null) {
-							try {
-								// Discard the state handle
-								stateHandle.discardState();
-							} catch (Throwable e) {
-								Exception newException = new Exception("Could not discard state handle of node " +
-									pathInZooKeeper + '.', e);
-
-								if (exception == null) {
-									exception = newException;
-								} else {
-									exception.addSuppressed(newException);
-								}
-							}
-						}
-
-						if (exception != null) {
-							throw exception;
-						}
-					} else if (resultCode == KeeperException.Code.NOTEMPTY) {
-						// Could not delete the node because it still contains children/locks
-						LOG.debug("Could not delete node " + pathInZooKeeper + " because it is still locked.");
-					} else {
-						throw new IllegalStateException("Unexpected result code " +
-							resultCode.name() + " in '" + event + "' callback.");
-					}
-				} else {
-					throw new IllegalStateException("Unexpected event type " +
-						event.getType() + " in '" + event + "' callback.");
-				}
-			} catch (Exception e) {
-				LOG.warn("Failed to run callback for delete operation on node " + pathInZooKeeper + '.', e);
-			}
-
-		}
-	}
-
-	/**
-	 * Callback interface for remove calls.
-	 */
-	public interface RemoveCallback<T extends Serializable> {
-		/**
-		 * Callback method. The parameter can be null if the {@link RetrievableStateHandle} could not be retrieved
-		 * from ZooKeeper.
-		 *
-		 * @param value RetrievableStateHandle retrieved from ZooKeeper, null if it was not retrievable
-		 * @throws FlinkException If the callback failed
-		 */
-		void apply(@Nullable RetrievableStateHandle<T> value) throws FlinkException;
-	}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperUtilityFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperUtilityFactory.java
index d3b7dc5..3e294e0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperUtilityFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperUtilityFactory.java
@@ -18,15 +18,15 @@
 
 package org.apache.flink.runtime.zookeeper;
 
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.recipes.shared.SharedCount;
-import org.apache.curator.framework.recipes.shared.SharedValue;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.util.ZooKeeperUtils;
 import org.apache.flink.util.Preconditions;
 
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.recipes.shared.SharedCount;
+import org.apache.curator.framework.recipes.shared.SharedValue;
+
 import java.io.Serializable;
-import java.util.concurrent.Executor;
 
 /**
  * Creates ZooKeeper utility classes without exposing the {@link CuratorFramework} dependency. The
@@ -71,7 +71,6 @@ public class ZooKeeperUtilityFactory {
 	 *
 	 * @param zkStateHandleStorePath specifying the path in ZooKeeper to store the state handles to
 	 * @param stateStorageHelper storing the actual state data
-	 * @param executor to run asynchronous callbacks of the state handle store
 	 * @param <T> Type of the state to be stored
 	 * @return a ZooKeeperStateHandleStore instance
 	 * @throws Exception if ZooKeeper could not create the provided state handle store path in
@@ -79,8 +78,7 @@ public class ZooKeeperUtilityFactory {
 	 */
 	public <T extends Serializable> ZooKeeperStateHandleStore<T> createZooKeeperStateHandleStore(
 			String zkStateHandleStorePath,
-			RetrievableStateStorageHelper<T> stateStorageHelper,
-			Executor executor) throws Exception {
+			RetrievableStateStorageHelper<T> stateStorageHelper) throws Exception {
 
 		facade.newNamespaceAwareEnsurePath(zkStateHandleStorePath).ensure(facade.getZookeeperClient());
 		CuratorFramework stateHandleStoreFacade = facade.usingNamespace(
@@ -88,7 +86,7 @@ public class ZooKeeperUtilityFactory {
 				facade.getNamespace(),
 				zkStateHandleStorePath));
 
-		return new ZooKeeperStateHandleStore<>(stateHandleStoreFacade, stateStorageHelper, executor);
+		return new ZooKeeperStateHandleStore<>(stateHandleStoreFacade, stateStorageHelper);
 	}
 
 	/**
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java
index 8156964..c4d8903 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java
@@ -193,7 +193,7 @@ public abstract class CompletedCheckpointStoreTest extends TestLogger {
 
 	// ---------------------------------------------------------------------------------------------
 
-	protected TestCompletedCheckpoint createCheckpoint(
+	public static TestCompletedCheckpoint createCheckpoint(
 		int id,
 		SharedStateRegistry sharedStateRegistry) throws IOException {
 
@@ -226,7 +226,12 @@ public abstract class CompletedCheckpointStoreTest extends TestLogger {
 		}
 	}
 
-	protected void verifyCheckpointDiscarded(Collection<OperatorState> operatorStates) {
+	public static void verifyCheckpointDiscarded(TestCompletedCheckpoint completedCheckpoint) {
+		assertTrue(completedCheckpoint.isDiscarded());
+		verifyCheckpointDiscarded(completedCheckpoint.getOperatorStates().values());
+	}
+
+	protected static void verifyCheckpointDiscarded(Collection<OperatorState> operatorStates) {
 		for (OperatorState operatorState : operatorStates) {
 			for (OperatorSubtaskState subtaskState : operatorState.getStates()) {
 				Assert.assertTrue(((TestOperatorSubtaskState)subtaskState).discarded);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreMockitoTest.java
similarity index 95%
copy from flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java
copy to flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreMockitoTest.java
index 0384733..1f7d369 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreMockitoTest.java
@@ -66,20 +66,11 @@ import static org.powermock.api.mockito.PowerMockito.doThrow;
 import static org.powermock.api.mockito.PowerMockito.whenNew;
 
 /**
- * Tests for {@link ZooKeeperCompletedCheckpointStore}.
+ * Mockito based tests for the {@link ZooKeeperStateHandleStore}.
  */
 @RunWith(PowerMockRunner.class)
 @PrepareForTest(ZooKeeperCompletedCheckpointStore.class)
-public class ZooKeeperCompletedCheckpointStoreTest extends TestLogger {
-
-	@Test
-	public void testPathConversion() {
-		final long checkpointId = 42L;
-
-		final String path = ZooKeeperCompletedCheckpointStore.checkpointIdToPath(checkpointId);
-
-		assertEquals(checkpointId, ZooKeeperCompletedCheckpointStore.pathToCheckpointId(path));
-	}
+public class ZooKeeperCompletedCheckpointStoreMockitoTest extends TestLogger {
 
 	/**
 	 * Tests that the completed checkpoint store can retrieve all checkpoints stored in ZooKeeper
@@ -133,7 +124,7 @@ public class ZooKeeperCompletedCheckpointStoreTest extends TestLogger {
 		final CuratorFramework client = mock(CuratorFramework.class, Mockito.RETURNS_DEEP_STUBS);
 		final RetrievableStateStorageHelper<CompletedCheckpoint> storageHelperMock = mock(RetrievableStateStorageHelper.class);
 
-		ZooKeeperStateHandleStore<CompletedCheckpoint> zooKeeperStateHandleStoreMock = spy(new ZooKeeperStateHandleStore<>(client, storageHelperMock, Executors.directExecutor()));
+		ZooKeeperStateHandleStore<CompletedCheckpoint> zooKeeperStateHandleStoreMock = spy(new ZooKeeperStateHandleStore<>(client, storageHelperMock));
 		whenNew(ZooKeeperStateHandleStore.class).withAnyArguments().thenReturn(zooKeeperStateHandleStoreMock);
 		doReturn(checkpointsInZooKeeper).when(zooKeeperStateHandleStoreMock).getAllSortedByNameAndLock();
 
@@ -221,7 +212,7 @@ public class ZooKeeperCompletedCheckpointStoreTest extends TestLogger {
 		final RetrievableStateStorageHelper<CompletedCheckpoint> storageHelperMock = mock(RetrievableStateStorageHelper.class);
 
 		ZooKeeperStateHandleStore<CompletedCheckpoint> zookeeperStateHandleStoreMock =
-			spy(new ZooKeeperStateHandleStore<>(client, storageHelperMock, Executors.directExecutor()));
+			spy(new ZooKeeperStateHandleStore<>(client, storageHelperMock));
 		whenNew(ZooKeeperStateHandleStore.class).withAnyArguments().thenReturn(zookeeperStateHandleStoreMock);
 
 		doAnswer(new Answer<RetrievableStateHandle<CompletedCheckpoint>>() {
@@ -236,7 +227,7 @@ public class ZooKeeperCompletedCheckpointStoreTest extends TestLogger {
 			}
 		}).when(zookeeperStateHandleStoreMock).addAndLock(anyString(), any(CompletedCheckpoint.class));
 
-		doThrow(new Exception()).when(zookeeperStateHandleStoreMock).releaseAndTryRemove(anyString(), any(ZooKeeperStateHandleStore.RemoveCallback.class));
+		doThrow(new Exception()).when(zookeeperStateHandleStoreMock).releaseAndTryRemove(anyString());
 
 		final int numCheckpointsToRetain = 1;
 		final String checkpointsPath = "foobar";
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java
index 0384733..f992d3b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java
@@ -18,60 +18,39 @@
 
 package org.apache.flink.runtime.checkpoint;
 
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.state.RetrievableStateHandle;
-import org.apache.flink.runtime.state.testutils.TestCompletedCheckpointStorageLocation;
+import org.apache.flink.runtime.state.SharedStateRegistry;
+import org.apache.flink.runtime.util.ZooKeeperUtils;
 import org.apache.flink.runtime.zookeeper.RetrievableStateStorageHelper;
-import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore;
+import org.apache.flink.runtime.zookeeper.ZooKeeperResource;
 import org.apache.flink.util.TestLogger;
 
 import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.api.BackgroundCallback;
-import org.apache.curator.framework.api.CuratorEvent;
-import org.apache.curator.framework.api.CuratorEventType;
-import org.apache.curator.framework.api.ErrorListenerPathable;
-import org.apache.curator.utils.EnsurePath;
+import org.hamcrest.Matchers;
+import org.junit.ClassRule;
 import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.mockito.Mockito;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
+
+import javax.annotation.Nonnull;
 
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
+import java.io.Serializable;
 import java.util.List;
-import java.util.concurrent.Executor;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyString;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-import static org.powermock.api.mockito.PowerMockito.doAnswer;
-import static org.powermock.api.mockito.PowerMockito.doThrow;
-import static org.powermock.api.mockito.PowerMockito.whenNew;
+import static org.junit.Assert.assertThat;
 
 /**
  * Tests for {@link ZooKeeperCompletedCheckpointStore}.
  */
-@RunWith(PowerMockRunner.class)
-@PrepareForTest(ZooKeeperCompletedCheckpointStore.class)
 public class ZooKeeperCompletedCheckpointStoreTest extends TestLogger {
 
+	@ClassRule
+	public static ZooKeeperResource zooKeeperResource = new ZooKeeperResource();
+
 	@Test
 	public void testPathConversion() {
 		final long checkpointId = 42L;
@@ -82,188 +61,103 @@ public class ZooKeeperCompletedCheckpointStoreTest extends TestLogger {
 	}
 
 	/**
-	 * Tests that the completed checkpoint store can retrieve all checkpoints stored in ZooKeeper
-	 * and ignores those which cannot be retrieved via their state handles.
-	 *
-	 * <p>We have a timeout in case the ZooKeeper store get's into a deadlock/livelock situation.
+	 * Tests that subsumed checkpoints are discarded.
 	 */
-	@Test(timeout = 50000)
-	public void testCheckpointRecovery() throws Exception {
-		final JobID jobID = new JobID();
-		final long checkpoint1Id = 1L;
-		final long checkpoint2Id = 2;
-		final List<Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String>> checkpointsInZooKeeper = new ArrayList<>(4);
-
-		final Collection<Long> expectedCheckpointIds = new HashSet<>(2);
-		expectedCheckpointIds.add(1L);
-		expectedCheckpointIds.add(2L);
-
-		final RetrievableStateHandle<CompletedCheckpoint> failingRetrievableStateHandle = mock(RetrievableStateHandle.class);
-		when(failingRetrievableStateHandle.retrieveState()).thenThrow(new IOException("Test exception"));
-
-		final RetrievableStateHandle<CompletedCheckpoint> retrievableStateHandle1 = mock(RetrievableStateHandle.class);
-		when(retrievableStateHandle1.retrieveState()).then(
-			(invocation) -> new CompletedCheckpoint(
-				jobID,
-				checkpoint1Id,
-				1L,
-				1L,
-				new HashMap<>(),
-				null,
-				CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION),
-				new TestCompletedCheckpointStorageLocation()));
-
-		final RetrievableStateHandle<CompletedCheckpoint> retrievableStateHandle2 = mock(RetrievableStateHandle.class);
-		when(retrievableStateHandle2.retrieveState()).then(
-			(invocation -> new CompletedCheckpoint(
-				jobID,
-				checkpoint2Id,
-				2L,
-				2L,
-				new HashMap<>(),
-				null,
-				CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION),
-				new TestCompletedCheckpointStorageLocation())));
-
-		checkpointsInZooKeeper.add(Tuple2.of(retrievableStateHandle1, "/foobar1"));
-		checkpointsInZooKeeper.add(Tuple2.of(failingRetrievableStateHandle, "/failing1"));
-		checkpointsInZooKeeper.add(Tuple2.of(retrievableStateHandle2, "/foobar2"));
-		checkpointsInZooKeeper.add(Tuple2.of(failingRetrievableStateHandle, "/failing2"));
-
-		final CuratorFramework client = mock(CuratorFramework.class, Mockito.RETURNS_DEEP_STUBS);
-		final RetrievableStateStorageHelper<CompletedCheckpoint> storageHelperMock = mock(RetrievableStateStorageHelper.class);
-
-		ZooKeeperStateHandleStore<CompletedCheckpoint> zooKeeperStateHandleStoreMock = spy(new ZooKeeperStateHandleStore<>(client, storageHelperMock, Executors.directExecutor()));
-		whenNew(ZooKeeperStateHandleStore.class).withAnyArguments().thenReturn(zooKeeperStateHandleStoreMock);
-		doReturn(checkpointsInZooKeeper).when(zooKeeperStateHandleStoreMock).getAllSortedByNameAndLock();
-
-		final int numCheckpointsToRetain = 1;
-
-		// Mocking for the delete operation on the CuratorFramework client
-		// It assures that the callback is executed synchronously
-
-		final EnsurePath ensurePathMock = mock(EnsurePath.class);
-		final CuratorEvent curatorEventMock = mock(CuratorEvent.class);
-		when(curatorEventMock.getType()).thenReturn(CuratorEventType.DELETE);
-		when(curatorEventMock.getResultCode()).thenReturn(0);
-		when(client.newNamespaceAwareEnsurePath(anyString())).thenReturn(ensurePathMock);
-
-		when(
-			client
-				.delete()
-				.inBackground(any(BackgroundCallback.class), any(Executor.class))
-		).thenAnswer(new Answer<ErrorListenerPathable<Void>>() {
-			@Override
-			public ErrorListenerPathable<Void> answer(InvocationOnMock invocation) throws Throwable {
-				final BackgroundCallback callback = (BackgroundCallback) invocation.getArguments()[0];
+	@Test
+	public void testDiscardingSubsumedCheckpoints() throws Exception {
+		final SharedStateRegistry sharedStateRegistry = new SharedStateRegistry();
+		final Configuration configuration = new Configuration();
+		configuration.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, zooKeeperResource.getConnectString());
+
+		final CuratorFramework client = ZooKeeperUtils.startCuratorFramework(configuration);
+		final ZooKeeperCompletedCheckpointStore checkpointStore = createZooKeeperCheckpointStore(client);
+
+		try {
+			final CompletedCheckpointStoreTest.TestCompletedCheckpoint checkpoint1 = CompletedCheckpointStoreTest.createCheckpoint(0, sharedStateRegistry);
+
+			checkpointStore.addCheckpoint(checkpoint1);
+			assertThat(checkpointStore.getAllCheckpoints(), Matchers.contains(checkpoint1));
+
+			final CompletedCheckpointStoreTest.TestCompletedCheckpoint checkpoint2 = CompletedCheckpointStoreTest.createCheckpoint(1, sharedStateRegistry);
+			checkpointStore.addCheckpoint(checkpoint2);
+			final List<CompletedCheckpoint> allCheckpoints = checkpointStore.getAllCheckpoints();
+			assertThat(allCheckpoints, Matchers.contains(checkpoint2));
+			assertThat(allCheckpoints, Matchers.not(Matchers.contains(checkpoint1)));
+
+			// verify that the subsumed checkpoint is discarded
+			CompletedCheckpointStoreTest.verifyCheckpointDiscarded(checkpoint1);
+		} finally {
+			client.close();
+		}
+	}
 
-				ErrorListenerPathable<Void> result = mock(ErrorListenerPathable.class);
+	/**
+	 * Tests that checkpoints are discarded when the completed checkpoint store is shut
+	 * down with a globally terminal state.
+	 */
+	@Test
+	public void testDiscardingCheckpointsAtShutDown() throws Exception {
+		final SharedStateRegistry sharedStateRegistry = new SharedStateRegistry();
+		final Configuration configuration = new Configuration();
+		configuration.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, zooKeeperResource.getConnectString());
 
-				when(result.forPath(anyString())).thenAnswer(new Answer<Void>() {
-					@Override
-					public Void answer(InvocationOnMock invocation) throws Throwable {
+		final CuratorFramework client = ZooKeeperUtils.startCuratorFramework(configuration);
+		final ZooKeeperCompletedCheckpointStore checkpointStore = createZooKeeperCheckpointStore(client);
 
-						callback.processResult(client, curatorEventMock);
+		try {
+			final CompletedCheckpointStoreTest.TestCompletedCheckpoint checkpoint1 = CompletedCheckpointStoreTest.createCheckpoint(0, sharedStateRegistry);
 
-						return null;
-					}
-				});
+			checkpointStore.addCheckpoint(checkpoint1);
+			assertThat(checkpointStore.getAllCheckpoints(), Matchers.contains(checkpoint1));
 
-				return result;
-			}
-		});
+			checkpointStore.shutdown(JobStatus.FINISHED);
 
-		final String checkpointsPath = "foobar";
-		final RetrievableStateStorageHelper<CompletedCheckpoint> stateStorage = mock(RetrievableStateStorageHelper.class);
+			// verify that the checkpoint is discarded
+			CompletedCheckpointStoreTest.verifyCheckpointDiscarded(checkpoint1);
+		} finally {
+			client.close();
+		}
+	}
 
-		ZooKeeperCompletedCheckpointStore zooKeeperCompletedCheckpointStore = new ZooKeeperCompletedCheckpointStore(
-			numCheckpointsToRetain,
+	@Nonnull
+	private ZooKeeperCompletedCheckpointStore createZooKeeperCheckpointStore(CuratorFramework client) throws Exception {
+		return new ZooKeeperCompletedCheckpointStore(
+			1,
 			client,
-			checkpointsPath,
-			stateStorage,
+			"/checkpoints",
+			new TestingRetrievableStateStorageHelper<>(),
 			Executors.directExecutor());
+	}
 
-		zooKeeperCompletedCheckpointStore.recover();
-
-		CompletedCheckpoint latestCompletedCheckpoint = zooKeeperCompletedCheckpointStore.getLatestCheckpoint();
-
-		// check that we return the latest retrievable checkpoint
-		// this should remove the latest checkpoint because it is broken
-		assertEquals(checkpoint2Id, latestCompletedCheckpoint.getCheckpointID());
-
-		// this should remove the second broken checkpoint because we're iterating over all checkpoints
-		List<CompletedCheckpoint> completedCheckpoints = zooKeeperCompletedCheckpointStore.getAllCheckpoints();
-
-		Collection<Long> actualCheckpointIds = new HashSet<>(completedCheckpoints.size());
-
-		for (CompletedCheckpoint completedCheckpoint : completedCheckpoints) {
-			actualCheckpointIds.add(completedCheckpoint.getCheckpointID());
+	private static final class TestingRetrievableStateStorageHelper<T extends Serializable> implements RetrievableStateStorageHelper<T> {
+		@Override
+		public RetrievableStateHandle<T> store(T state) {
+			return new TestingRetrievableStateHandle<>(state);
 		}
 
-		assertEquals(expectedCheckpointIds, actualCheckpointIds);
-
-		// check that we did not discard any of the state handles
-		verify(retrievableStateHandle1, never()).discardState();
-		verify(retrievableStateHandle2, never()).discardState();
+		private static class TestingRetrievableStateHandle<T extends Serializable> implements RetrievableStateHandle<T> {
 
-		// Make sure that we also didn't discard any of the broken handles. Only when checkpoints
-		// are subsumed should they be discarded.
-		verify(failingRetrievableStateHandle, never()).discardState();
-	}
+			private static final long serialVersionUID = 137053380713794300L;
 
-	/**
-	 * Tests that the checkpoint does not exist in the store when we fail to add
-	 * it into the store (i.e., there exists an exception thrown by the method).
-	 */
-	@Test
-	public void testAddCheckpointWithFailedRemove() throws Exception {
-		final CuratorFramework client = mock(CuratorFramework.class, Mockito.RETURNS_DEEP_STUBS);
-		final RetrievableStateStorageHelper<CompletedCheckpoint> storageHelperMock = mock(RetrievableStateStorageHelper.class);
+			private final T state;
 
-		ZooKeeperStateHandleStore<CompletedCheckpoint> zookeeperStateHandleStoreMock =
-			spy(new ZooKeeperStateHandleStore<>(client, storageHelperMock, Executors.directExecutor()));
-		whenNew(ZooKeeperStateHandleStore.class).withAnyArguments().thenReturn(zookeeperStateHandleStoreMock);
+			private TestingRetrievableStateHandle(T state) {
+				this.state = state;
+			}
 
-		doAnswer(new Answer<RetrievableStateHandle<CompletedCheckpoint>>() {
 			@Override
-			public RetrievableStateHandle<CompletedCheckpoint> answer(InvocationOnMock invocationOnMock) throws Throwable {
-				CompletedCheckpoint checkpoint = (CompletedCheckpoint) invocationOnMock.getArguments()[1];
-
-				RetrievableStateHandle<CompletedCheckpoint> retrievableStateHandle = mock(RetrievableStateHandle.class);
-				when(retrievableStateHandle.retrieveState()).thenReturn(checkpoint);
-
-				return retrievableStateHandle;
+			public T retrieveState() throws IOException, ClassNotFoundException {
+				return state;
 			}
-		}).when(zookeeperStateHandleStoreMock).addAndLock(anyString(), any(CompletedCheckpoint.class));
-
-		doThrow(new Exception()).when(zookeeperStateHandleStoreMock).releaseAndTryRemove(anyString(), any(ZooKeeperStateHandleStore.RemoveCallback.class));
 
-		final int numCheckpointsToRetain = 1;
-		final String checkpointsPath = "foobar";
-		final RetrievableStateStorageHelper<CompletedCheckpoint> stateSotrage = mock(RetrievableStateStorageHelper.class);
-
-		ZooKeeperCompletedCheckpointStore zooKeeperCompletedCheckpointStore = new ZooKeeperCompletedCheckpointStore(
-			numCheckpointsToRetain,
-			client,
-			checkpointsPath,
-			stateSotrage,
-			Executors.directExecutor());
+			@Override
+			public void discardState() throws Exception {
+				// no op
+			}
 
-		for (long i = 0; i <= numCheckpointsToRetain; ++i) {
-			CompletedCheckpoint checkpointToAdd = mock(CompletedCheckpoint.class);
-			doReturn(i).when(checkpointToAdd).getCheckpointID();
-			doReturn(Collections.emptyMap()).when(checkpointToAdd).getOperatorStates();
-
-			try {
-				zooKeeperCompletedCheckpointStore.addCheckpoint(checkpointToAdd);
-
-				// The checkpoint should be in the store if we successfully add it into the store.
-				List<CompletedCheckpoint> addedCheckpoints = zooKeeperCompletedCheckpointStore.getAllCheckpoints();
-				assertTrue(addedCheckpoints.contains(checkpointToAdd));
-			} catch (Exception e) {
-				// The checkpoint should not be in the store if any exception is thrown.
-				List<CompletedCheckpoint> addedCheckpoints = zooKeeperCompletedCheckpointStore.getAllCheckpoints();
-				assertFalse(addedCheckpoints.contains(checkpointToAdd));
+			@Override
+			public long getStateSize() {
+				return 0;
 			}
 		}
 	}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphsStoreITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphsStoreITCase.java
index c1a7b53..e9be145 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphsStoreITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphsStoreITCase.java
@@ -20,7 +20,6 @@ package org.apache.flink.runtime.jobmanager;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.akka.ListeningBehaviour;
-import org.apache.flink.runtime.concurrent.Executors;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore.SubmittedJobGraphListener;
@@ -90,8 +89,7 @@ public class ZooKeeperSubmittedJobGraphsStoreITCase extends TestLogger {
 		ZooKeeperSubmittedJobGraphStore jobGraphs = new ZooKeeperSubmittedJobGraphStore(
 			ZooKeeper.createClient(),
 			"/testPutAndRemoveJobGraph",
-			localStateStorage,
-			Executors.directExecutor());
+			localStateStorage);
 
 		try {
 			SubmittedJobGraphListener listener = mock(SubmittedJobGraphListener.class);
@@ -147,7 +145,7 @@ public class ZooKeeperSubmittedJobGraphsStoreITCase extends TestLogger {
 	@Test
 	public void testRecoverJobGraphs() throws Exception {
 		ZooKeeperSubmittedJobGraphStore jobGraphs = new ZooKeeperSubmittedJobGraphStore(
-				ZooKeeper.createClient(), "/testRecoverJobGraphs", localStateStorage, Executors.directExecutor());
+				ZooKeeper.createClient(), "/testRecoverJobGraphs", localStateStorage);
 
 		try {
 			SubmittedJobGraphListener listener = mock(SubmittedJobGraphListener.class);
@@ -198,10 +196,10 @@ public class ZooKeeperSubmittedJobGraphsStoreITCase extends TestLogger {
 
 		try {
 			jobGraphs = new ZooKeeperSubmittedJobGraphStore(
-					ZooKeeper.createClient(), "/testConcurrentAddJobGraph", localStateStorage, Executors.directExecutor());
+					ZooKeeper.createClient(), "/testConcurrentAddJobGraph", localStateStorage);
 
 			otherJobGraphs = new ZooKeeperSubmittedJobGraphStore(
-					ZooKeeper.createClient(), "/testConcurrentAddJobGraph", localStateStorage, Executors.directExecutor());
+					ZooKeeper.createClient(), "/testConcurrentAddJobGraph", localStateStorage);
 
 
 			SubmittedJobGraph jobGraph = createSubmittedJobGraph(new JobID(), 0);
@@ -257,10 +255,10 @@ public class ZooKeeperSubmittedJobGraphsStoreITCase extends TestLogger {
 	@Test(expected = IllegalStateException.class)
 	public void testUpdateJobGraphYouDidNotGetOrAdd() throws Exception {
 		ZooKeeperSubmittedJobGraphStore jobGraphs = new ZooKeeperSubmittedJobGraphStore(
-				ZooKeeper.createClient(), "/testUpdateJobGraphYouDidNotGetOrAdd", localStateStorage, Executors.directExecutor());
+				ZooKeeper.createClient(), "/testUpdateJobGraphYouDidNotGetOrAdd", localStateStorage);
 
 		ZooKeeperSubmittedJobGraphStore otherJobGraphs = new ZooKeeperSubmittedJobGraphStore(
-				ZooKeeper.createClient(), "/testUpdateJobGraphYouDidNotGetOrAdd", localStateStorage, Executors.directExecutor());
+				ZooKeeper.createClient(), "/testUpdateJobGraphYouDidNotGetOrAdd", localStateStorage);
 
 		jobGraphs.start(null);
 		otherJobGraphs.start(null);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperResource.java b/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperResource.java
new file mode 100644
index 0000000..c4c5694
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperResource.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.zookeeper;
+
+import org.apache.flink.util.Preconditions;
+
+import org.apache.curator.test.TestingServer;
+import org.junit.rules.ExternalResource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+
+/**
+ * {@link ExternalResource} which starts a {@link org.apache.zookeeper.server.ZooKeeperServer}.
+ */
+public class ZooKeeperResource extends ExternalResource {
+
+	private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperResource.class);
+
+	@Nullable
+	private TestingServer zooKeeperServer;
+
+	public String getConnectString() {
+		verifyIsRunning();
+		return zooKeeperServer.getConnectString();
+	}
+
+	private void verifyIsRunning() {
+		Preconditions.checkState(zooKeeperServer != null);
+	}
+
+	@Override
+	protected void before() throws Throwable {
+		terminateZooKeeperServer();
+		zooKeeperServer = new TestingServer(true);
+	}
+
+	private void terminateZooKeeperServer() throws IOException {
+		if (zooKeeperServer != null) {
+			zooKeeperServer.stop();
+			zooKeeperServer = null;
+		}
+	}
+
+	@Override
+	protected void after() {
+		try {
+			terminateZooKeeperServer();
+		} catch (IOException e) {
+			LOG.warn("Could not properly terminate the {}.", getClass().getSimpleName(), e);
+		}
+	}
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreTest.java
index fd39b25..2dd27e7 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreTest.java
@@ -18,21 +18,19 @@
 
 package org.apache.flink.runtime.zookeeper;
 
-import org.apache.curator.framework.CuratorFramework;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.HighAvailabilityOptions;
-import org.apache.flink.runtime.concurrent.Executors;
 import org.apache.flink.runtime.state.RetrievableStateHandle;
 import org.apache.flink.runtime.util.ZooKeeperUtils;
 import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.TestLogger;
+
+import org.apache.curator.framework.CuratorFramework;
 import org.apache.zookeeper.data.Stat;
 import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.Test;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -41,7 +39,6 @@ import java.util.Collection;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
-import java.util.concurrent.CountDownLatch;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotEquals;
@@ -49,12 +46,7 @@ import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 /**
@@ -88,8 +80,8 @@ public class ZooKeeperStateHandleStoreTest extends TestLogger {
 	@Test
 	public void testAddAndLock() throws Exception {
 		LongStateStorage longStateStorage = new LongStateStorage();
-		ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<Long>(
-				ZOOKEEPER.getClient(), longStateStorage, Executors.directExecutor());
+		ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
+			ZOOKEEPER.getClient(), longStateStorage);
 
 		// Config
 		final String pathInZooKeeper = "/testAdd";
@@ -136,7 +128,7 @@ public class ZooKeeperStateHandleStoreTest extends TestLogger {
 		LongStateStorage stateHandleProvider = new LongStateStorage();
 
 		ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
-				ZOOKEEPER.getClient(), stateHandleProvider, Executors.directExecutor());
+				ZOOKEEPER.getClient(), stateHandleProvider);
 
 		ZOOKEEPER.getClient().create().forPath("/testAddAlreadyExistingPath");
 
@@ -161,7 +153,7 @@ public class ZooKeeperStateHandleStoreTest extends TestLogger {
 		when(client.inTransaction().create()).thenThrow(new RuntimeException("Expected test Exception."));
 
 		ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
-				client, stateHandleProvider, Executors.directExecutor());
+				client, stateHandleProvider);
 
 		// Config
 		final String pathInZooKeeper = "/testAddDiscardStateHandleAfterFailure";
@@ -191,7 +183,7 @@ public class ZooKeeperStateHandleStoreTest extends TestLogger {
 		LongStateStorage stateHandleProvider = new LongStateStorage();
 
 		ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
-				ZOOKEEPER.getClient(), stateHandleProvider, Executors.directExecutor());
+				ZOOKEEPER.getClient(), stateHandleProvider);
 
 		// Config
 		final String pathInZooKeeper = "/testReplace";
@@ -230,7 +222,7 @@ public class ZooKeeperStateHandleStoreTest extends TestLogger {
 		RetrievableStateStorageHelper<Long> stateStorage = new LongStateStorage();
 
 		ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
-				ZOOKEEPER.getClient(), stateStorage, Executors.directExecutor());
+				ZOOKEEPER.getClient(), stateStorage);
 
 		store.replace("/testReplaceNonExistingPath", 0, 1L);
 	}
@@ -247,7 +239,7 @@ public class ZooKeeperStateHandleStoreTest extends TestLogger {
 		when(client.setData()).thenThrow(new RuntimeException("Expected test Exception."));
 
 		ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
-				client, stateHandleProvider, Executors.directExecutor());
+				client, stateHandleProvider);
 
 		// Config
 		final String pathInZooKeeper = "/testReplaceDiscardStateHandleAfterFailure";
@@ -289,7 +281,7 @@ public class ZooKeeperStateHandleStoreTest extends TestLogger {
 		LongStateStorage stateHandleProvider = new LongStateStorage();
 
 		ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
-				ZOOKEEPER.getClient(), stateHandleProvider, Executors.directExecutor());
+				ZOOKEEPER.getClient(), stateHandleProvider);
 
 		// Config
 		final String pathInZooKeeper = "/testGetAndExists";
@@ -314,7 +306,7 @@ public class ZooKeeperStateHandleStoreTest extends TestLogger {
 		LongStateStorage stateHandleProvider = new LongStateStorage();
 
 		ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
-				ZOOKEEPER.getClient(), stateHandleProvider, Executors.directExecutor());
+				ZOOKEEPER.getClient(), stateHandleProvider);
 
 		store.getAndLock("/testGetNonExistingPath");
 	}
@@ -328,7 +320,7 @@ public class ZooKeeperStateHandleStoreTest extends TestLogger {
 		LongStateStorage stateHandleProvider = new LongStateStorage();
 
 		ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
-				ZOOKEEPER.getClient(), stateHandleProvider, Executors.directExecutor());
+				ZOOKEEPER.getClient(), stateHandleProvider);
 
 		// Config
 		final String pathInZooKeeper = "/testGetAll";
@@ -359,7 +351,7 @@ public class ZooKeeperStateHandleStoreTest extends TestLogger {
 		LongStateStorage stateHandleProvider = new LongStateStorage();
 
 		ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
-				ZOOKEEPER.getClient(), stateHandleProvider, Executors.directExecutor());
+				ZOOKEEPER.getClient(), stateHandleProvider);
 
 		// Config
 		final String basePath = "/testGetAllSortedByName";
@@ -393,7 +385,7 @@ public class ZooKeeperStateHandleStoreTest extends TestLogger {
 		LongStateStorage stateHandleProvider = new LongStateStorage();
 
 		ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
-				ZOOKEEPER.getClient(), stateHandleProvider, Executors.directExecutor());
+				ZOOKEEPER.getClient(), stateHandleProvider);
 
 		// Config
 		final String pathInZooKeeper = "/testRemove";
@@ -401,50 +393,14 @@ public class ZooKeeperStateHandleStoreTest extends TestLogger {
 
 		store.addAndLock(pathInZooKeeper, state);
 
+		final int numberOfGlobalDiscardCalls = LongRetrievableStateHandle.getNumberOfGlobalDiscardCalls();
+
 		// Test
 		store.releaseAndTryRemove(pathInZooKeeper);
 
 		// Verify discarded
 		assertEquals(0, ZOOKEEPER.getClient().getChildren().forPath("/").size());
-	}
-
-	/**
-	 * Tests that state handles are correctly removed with a callback.
-	 */
-	@Test
-	public void testRemoveWithCallback() throws Exception {
-		// Setup
-		LongStateStorage stateHandleProvider = new LongStateStorage();
-
-		ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
-				ZOOKEEPER.getClient(), stateHandleProvider, Executors.directExecutor());
-
-		// Config
-		final String pathInZooKeeper = "/testRemoveWithCallback";
-		final Long state = 27255442L;
-
-		store.addAndLock(pathInZooKeeper, state);
-
-		final CountDownLatch sync = new CountDownLatch(1);
-		ZooKeeperStateHandleStore.RemoveCallback<Long> callback = mock(ZooKeeperStateHandleStore.RemoveCallback.class);
-		doAnswer(new Answer<Void>() {
-			@Override
-			public Void answer(InvocationOnMock invocation) throws Throwable {
-				sync.countDown();
-				return null;
-			}
-		}).when(callback).apply(any(RetrievableStateHandle.class));
-
-		// Test
-		store.releaseAndTryRemove(pathInZooKeeper, callback);
-
-		// Verify discarded and callback called
-		assertEquals(0, ZOOKEEPER.getClient().getChildren().forPath("/").size());
-
-		sync.await();
-
-		verify(callback, times(1))
-				.apply(any(RetrievableStateHandle.class));
+		assertEquals(numberOfGlobalDiscardCalls + 1, LongRetrievableStateHandle.getNumberOfGlobalDiscardCalls());
 	}
 
 	/** Tests that all state handles are correctly discarded. */
@@ -454,7 +410,7 @@ public class ZooKeeperStateHandleStoreTest extends TestLogger {
 		LongStateStorage stateHandleProvider = new LongStateStorage();
 
 		ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
-				ZOOKEEPER.getClient(), stateHandleProvider, Executors.directExecutor());
+				ZOOKEEPER.getClient(), stateHandleProvider);
 
 		// Config
 		final String pathInZooKeeper = "/testDiscardAll";
@@ -486,8 +442,7 @@ public class ZooKeeperStateHandleStoreTest extends TestLogger {
 
 		ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
 			ZOOKEEPER.getClient(),
-			stateStorage,
-			Executors.directExecutor());
+			stateStorage);
 
 		final Collection<Long> input = new HashSet<>();
 		input.add(1L);
@@ -543,13 +498,11 @@ public class ZooKeeperStateHandleStoreTest extends TestLogger {
 
 		ZooKeeperStateHandleStore<Long> zkStore1 = new ZooKeeperStateHandleStore<>(
 			ZOOKEEPER.getClient(),
-			longStateStorage,
-			Executors.directExecutor());
+			longStateStorage);
 
 		ZooKeeperStateHandleStore<Long> zkStore2 = new ZooKeeperStateHandleStore<>(
 			ZOOKEEPER.getClient(),
-			longStateStorage,
-			Executors.directExecutor());
+			longStateStorage);
 
 		final String statePath = "/state";
 
@@ -586,13 +539,11 @@ public class ZooKeeperStateHandleStoreTest extends TestLogger {
 
 		ZooKeeperStateHandleStore<Long> zkStore1 = new ZooKeeperStateHandleStore<>(
 			ZOOKEEPER.getClient(),
-			longStateStorage,
-			Executors.directExecutor());
+			longStateStorage);
 
 		ZooKeeperStateHandleStore<Long> zkStore2 = new ZooKeeperStateHandleStore<>(
 			ZOOKEEPER.getClient(),
-			longStateStorage,
-			Executors.directExecutor());
+			longStateStorage);
 
 		final String path = "/state";
 
@@ -649,8 +600,7 @@ public class ZooKeeperStateHandleStoreTest extends TestLogger {
 
 			ZooKeeperStateHandleStore<Long> zkStore = new ZooKeeperStateHandleStore<>(
 				client,
-				longStateStorage,
-				Executors.directExecutor());
+				longStateStorage);
 
 			final String path = "/state";
 
@@ -682,8 +632,7 @@ public class ZooKeeperStateHandleStoreTest extends TestLogger {
 
 		ZooKeeperStateHandleStore<Long> zkStore = new ZooKeeperStateHandleStore<>(
 			ZOOKEEPER.getClient(),
-			longStateStorage,
-			Executors.directExecutor());
+			longStateStorage);
 
 		final String path = "/state";
 
@@ -720,8 +669,7 @@ public class ZooKeeperStateHandleStoreTest extends TestLogger {
 
 		ZooKeeperStateHandleStore<Long> zkStore = new ZooKeeperStateHandleStore<>(
 			ZOOKEEPER.getClient(),
-			longStateStorage,
-			Executors.directExecutor());
+			longStateStorage);
 
 		final Collection<String> paths = Arrays.asList("/state1", "/state2", "/state3");
 
@@ -775,9 +723,11 @@ public class ZooKeeperStateHandleStoreTest extends TestLogger {
 
 		private static final long serialVersionUID = -3555329254423838912L;
 
+		private static int numberOfGlobalDiscardCalls = 0;
+
 		private final Long state;
 
-		private int numberOfDiscardCalls;
+		private int numberOfDiscardCalls = 0;
 
 		public LongRetrievableStateHandle(Long state) {
 			this.state = state;
@@ -790,6 +740,7 @@ public class ZooKeeperStateHandleStoreTest extends TestLogger {
 
 		@Override
 		public void discardState() throws Exception {
+			numberOfGlobalDiscardCalls++;
 			numberOfDiscardCalls++;
 		}
 
@@ -798,8 +749,12 @@ public class ZooKeeperStateHandleStoreTest extends TestLogger {
 			return 0;
 		}
 
-		public int getNumberOfDiscardCalls() {
+		int getNumberOfDiscardCalls() {
 			return numberOfDiscardCalls;
 		}
+
+		public static int getNumberOfGlobalDiscardCalls() {
+			return numberOfGlobalDiscardCalls;
+		}
 	}
 }


[flink] 02/08: [hotfix] Fix checkstyle violations in ZooKeeperCompletedCheckpointStore

Posted by tr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 0a345ebedad6869b40d5f3b8f97e3c43460eba12
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Mon Aug 20 09:53:05 2018 +0200

    [hotfix] Fix checkstyle violations in ZooKeeperCompletedCheckpointStore
---
 .../flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java     | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
index f221270..deb1ab3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
@@ -381,7 +381,7 @@ public class ZooKeeperCompletedCheckpointStore implements CompletedCheckpointSto
 			String numberString;
 
 			// check if we have a leading slash
-			if ('/' == path.charAt(0) ) {
+			if ('/' == path.charAt(0)) {
 				numberString = path.substring(1);
 			} else {
 				numberString = path;


[flink] 08/08: [FLINK-10011] Release JobGraph from SubmittedJobGraphStore in Dispatcher

Posted by tr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit a933d87e14488da9b7305a671d0cb2e24f43155c
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Mon Aug 20 11:24:36 2018 +0200

    [FLINK-10011] Release JobGraph from SubmittedJobGraphStore in Dispatcher
    
    The Dispatcher now releases all JobGraphs it has stored in the SubmittedJobGraphStore
    if it loses leadership. This ensures that the newly elected leader after recovering
    the jobs can remove them from the SubmittedJobGraphStore. Before, the problem was
    that a former leader might still be connected to ZooKeeper which keeps its ephemeral
    lock nodes alive. This could prevent the deletion of the JobGraph from ZooKeeper.
    The problem occurs in particular in multi stand-by Dispatcher scenarios.
    
    This closes #6587.
---
 .../flink/runtime/dispatcher/Dispatcher.java       |  59 +++---
 .../dispatcher/NoOpSubmittedJobGraphListener.java  |  40 ++++
 .../runtime/dispatcher/TestingDispatcher.java      |  13 ++
 .../dispatcher/ZooKeeperHADispatcherTest.java      | 203 +++++++++++++++++++++
 4 files changed, 293 insertions(+), 22 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
index c96acbd..c31e64c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
@@ -572,30 +572,38 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
 		}
 
 		return jobManagerRunnerTerminationFuture.thenRunAsync(
-			() -> {
-				jobManagerMetricGroup.removeJob(jobId);
+			() -> cleanUpJobData(jobId, cleanupHA),
+			getRpcService().getExecutor());
+	}
 
-				boolean cleanupHABlobs = false;
-				if (cleanupHA) {
-					try {
-						submittedJobGraphStore.removeJobGraph(jobId);
+	private void cleanUpJobData(JobID jobId, boolean cleanupHA) {
+		jobManagerMetricGroup.removeJob(jobId);
 
-						// only clean up the HA blobs if we could remove the job from HA storage
-						cleanupHABlobs = true;
-					} catch (Exception e) {
-						log.warn("Could not properly remove job {} from submitted job graph store.", jobId, e);
-					}
+		boolean cleanupHABlobs = false;
+		if (cleanupHA) {
+			try {
+				submittedJobGraphStore.removeJobGraph(jobId);
 
-					try {
-						runningJobsRegistry.clearJob(jobId);
-					} catch (IOException e) {
-						log.warn("Could not properly remove job {} from the running jobs registry.", jobId, e);
-					}
-				}
+				// only clean up the HA blobs if we could remove the job from HA storage
+				cleanupHABlobs = true;
+			} catch (Exception e) {
+				log.warn("Could not properly remove job {} from submitted job graph store.", jobId, e);
+			}
 
-				blobServer.cleanupJob(jobId, cleanupHABlobs);
-			},
-			getRpcService().getExecutor());
+			try {
+				runningJobsRegistry.clearJob(jobId);
+			} catch (IOException e) {
+				log.warn("Could not properly remove job {} from the running jobs registry.", jobId, e);
+			}
+		} else {
+			try {
+				submittedJobGraphStore.releaseJobGraph(jobId);
+			} catch (Exception e) {
+				log.warn("Could not properly release job {} from submitted job graph store.", jobId, e);
+			}
+		}
+
+		blobServer.cleanupJob(jobId, cleanupHABlobs);
 	}
 
 	/**
@@ -806,8 +814,7 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
 	}
 
 	private CompletableFuture<Void> waitForTerminatingJobManager(JobID jobId, JobGraph jobGraph, ConsumerWithException<JobGraph, ?> action) {
-		final CompletableFuture<Void> jobManagerTerminationFuture = jobManagerTerminationFutures
-			.getOrDefault(jobId, CompletableFuture.completedFuture(null))
+		final CompletableFuture<Void> jobManagerTerminationFuture = getJobTerminationFuture(jobId)
 			.exceptionally((Throwable throwable) -> {
 				throw new CompletionException(
 					new DispatcherException(
@@ -822,6 +829,14 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
 			getMainThreadExecutor());
 	}
 
+	protected CompletableFuture<Void> getJobTerminationFuture(JobID jobId) {
+		if (jobManagerRunners.containsKey(jobId)) {
+			return FutureUtils.completedExceptionally(new DispatcherException(String.format("Job with job id %s is still running.", jobId)));
+		} else {
+			return jobManagerTerminationFutures.getOrDefault(jobId, CompletableFuture.completedFuture(null));
+		}
+	}
+
 	private void setNewFencingToken(@Nullable DispatcherId dispatcherId) {
 		// clear the state if we've been the leader before
 		if (getFencingToken() != null) {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/NoOpSubmittedJobGraphListener.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/NoOpSubmittedJobGraphListener.java
new file mode 100644
index 0000000..493534d
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/NoOpSubmittedJobGraphListener.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore;
+
+/**
+ * No operation {@link org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore.SubmittedJobGraphListener}
+ * implemetation for testing purposes.
+ */
+public enum NoOpSubmittedJobGraphListener implements SubmittedJobGraphStore.SubmittedJobGraphListener {
+	INSTANCE;
+
+	@Override
+	public void onAddedJobGraph(JobID jobId) {
+		// No op
+	}
+
+	@Override
+	public void onRemovedJobGraph(JobID jobId) {
+		// No op
+	}
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingDispatcher.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingDispatcher.java
index f5091ea..5141be0 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingDispatcher.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingDispatcher.java
@@ -19,6 +19,8 @@
 package org.apache.flink.runtime.dispatcher;
 
 import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.blob.BlobServer;
 import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
@@ -29,8 +31,12 @@ import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
 
+import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
+
 /**
  * {@link Dispatcher} implementation used for testing purposes.
  */
@@ -72,4 +78,11 @@ class TestingDispatcher extends Dispatcher {
 		runAsync(
 			() -> jobReachedGloballyTerminalState(archivedExecutionGraph));
 	}
+
+	@VisibleForTesting
+	public CompletableFuture<Void> getJobTerminationFuture(@Nonnull JobID jobId, @Nonnull Time timeout) {
+		return callAsyncWithoutFencing(
+			() -> getJobTerminationFuture(jobId),
+			timeout).thenCompose(Function.identity());
+	}
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/ZooKeeperHADispatcherTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/ZooKeeperHADispatcherTest.java
new file mode 100644
index 0000000..dd03758
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/ZooKeeperHADispatcherTest.java
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.runtime.blob.BlobServer;
+import org.apache.flink.runtime.blob.VoidBlobStore;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobmanager.SubmittedJobGraph;
+import org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore;
+import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
+import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway;
+import org.apache.flink.runtime.rpc.RpcUtils;
+import org.apache.flink.runtime.rpc.TestingRpcService;
+import org.apache.flink.runtime.util.TestingFatalErrorHandler;
+import org.apache.flink.runtime.util.ZooKeeperUtils;
+import org.apache.flink.runtime.zookeeper.ZooKeeperResource;
+import org.apache.flink.util.TestLogger;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.hamcrest.Matchers;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.rules.TestName;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.concurrent.CompletableFuture;
+
+import static org.junit.Assert.assertThat;
+
+/**
+ * Test cases for the interaction between ZooKeeper HA and the {@link Dispatcher}.
+ */
+public class ZooKeeperHADispatcherTest extends TestLogger {
+
+	private static final Time TIMEOUT = Time.seconds(10L);
+
+	@ClassRule
+	public static final ZooKeeperResource ZOO_KEEPER_RESOURCE = new ZooKeeperResource();
+
+	@ClassRule
+	public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();
+
+	private static Configuration configuration;
+
+	private static TestingRpcService rpcService;
+
+	private static BlobServer blobServer;
+
+	@Rule
+	public TestName name = new TestName();
+
+	private TestingFatalErrorHandler testingFatalErrorHandler;
+
+	@BeforeClass
+	public static void setupClass() throws IOException {
+		configuration = new Configuration();
+		configuration.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, ZOO_KEEPER_RESOURCE.getConnectString());
+		configuration.setString(HighAvailabilityOptions.HA_STORAGE_PATH, TEMPORARY_FOLDER.newFolder().getAbsolutePath());
+		rpcService = new TestingRpcService();
+		blobServer = new BlobServer(configuration, new VoidBlobStore());
+	}
+
+	@Before
+	public void setup() {
+		testingFatalErrorHandler = new TestingFatalErrorHandler();
+	}
+
+	@After
+	public void teardown() throws Exception {
+		if (testingFatalErrorHandler != null) {
+			testingFatalErrorHandler.rethrowError();
+		}
+	}
+
+	@AfterClass
+	public static void teardownClass() throws Exception {
+		if (rpcService != null) {
+			RpcUtils.terminateRpcService(rpcService, TIMEOUT);
+			rpcService = null;
+		}
+
+		if (blobServer != null) {
+			blobServer.close();
+			blobServer = null;
+		}
+	}
+
+	/**
+	 * Tests that the {@link Dispatcher} releases a locked {@link SubmittedJobGraph} if it
+	 * lost the leadership.
+	 */
+	@Test
+	public void testSubmittedJobGraphRelease() throws Exception {
+		final CuratorFramework client = ZooKeeperUtils.startCuratorFramework(configuration);
+		final CuratorFramework otherClient = ZooKeeperUtils.startCuratorFramework(configuration);
+
+		try (final TestingHighAvailabilityServices testingHighAvailabilityServices = new TestingHighAvailabilityServices()) {
+			testingHighAvailabilityServices.setSubmittedJobGraphStore(ZooKeeperUtils.createSubmittedJobGraphs(client, configuration));
+
+			final ZooKeeperSubmittedJobGraphStore otherSubmittedJobGraphStore = ZooKeeperUtils.createSubmittedJobGraphs(
+				otherClient,
+				configuration);
+
+			otherSubmittedJobGraphStore.start(NoOpSubmittedJobGraphListener.INSTANCE);
+
+			final TestingLeaderElectionService leaderElectionService = new TestingLeaderElectionService();
+			testingHighAvailabilityServices.setDispatcherLeaderElectionService(leaderElectionService);
+
+			final TestingDispatcher dispatcher = createDispatcher(testingHighAvailabilityServices);
+
+			dispatcher.start();
+
+			try {
+				final DispatcherId expectedLeaderId = DispatcherId.generate();
+				leaderElectionService.isLeader(expectedLeaderId.toUUID()).get();
+
+				final DispatcherGateway dispatcherGateway = dispatcher.getSelfGateway(DispatcherGateway.class);
+
+				final JobGraph nonEmptyJobGraph = DispatcherHATest.createNonEmptyJobGraph();
+				final CompletableFuture<Acknowledge> submissionFuture = dispatcherGateway.submitJob(nonEmptyJobGraph, TIMEOUT);
+				submissionFuture.get();
+
+				Collection<JobID> jobIds = otherSubmittedJobGraphStore.getJobIds();
+
+				final JobID jobId = nonEmptyJobGraph.getJobID();
+				assertThat(jobIds, Matchers.contains(jobId));
+
+				leaderElectionService.notLeader();
+
+				// wait for the job to properly terminate
+				final CompletableFuture<Void> jobTerminationFuture = dispatcher.getJobTerminationFuture(jobId, TIMEOUT);
+				jobTerminationFuture.get();
+
+				// recover the job
+				final SubmittedJobGraph submittedJobGraph = otherSubmittedJobGraphStore.recoverJobGraph(jobId);
+
+				assertThat(submittedJobGraph, Matchers.is(Matchers.notNullValue()));
+
+				// check that the other submitted job graph store can remove the job graph after the original leader
+				// has lost its leadership
+				otherSubmittedJobGraphStore.removeJobGraph(jobId);
+
+				jobIds = otherSubmittedJobGraphStore.getJobIds();
+
+				assertThat(jobIds, Matchers.not(Matchers.contains(jobId)));
+			} finally {
+				RpcUtils.terminateRpcEndpoint(dispatcher, TIMEOUT);
+				client.close();
+				otherClient.close();
+			}
+		}
+	}
+
+	@Nonnull
+	private TestingDispatcher createDispatcher(TestingHighAvailabilityServices testingHighAvailabilityServices) throws Exception {
+		return new TestingDispatcher(
+			rpcService,
+			Dispatcher.DISPATCHER_NAME + '_' + name.getMethodName(),
+			configuration,
+			testingHighAvailabilityServices,
+			new TestingResourceManagerGateway(),
+			blobServer,
+			new HeartbeatServices(1000L, 1000L),
+			UnregisteredMetricGroups.createUnregisteredJobManagerMetricGroup(),
+			null,
+			new MemoryArchivedExecutionGraphStore(),
+			new TestingJobManagerRunnerFactory(new CompletableFuture<>(), new CompletableFuture<>()),
+			testingFatalErrorHandler);
+	}
+}


[flink] 06/08: [FLINK-10011] Release JobGraph after losing leadership in JobManager

Posted by tr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit c395e896e5eb69a8255c372ea656483b05c8f94d
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Tue Aug 21 00:14:32 2018 +0200

    [FLINK-10011] Release JobGraph after losing leadership in JobManager
    
    The JobManager now releases its lock on all JobGraphs it has stored in
    the SubmittedJobGraphStore if the JobManager loses leadership. This ensures
    that a different JobManager can delete these jobs after it has recovered
    them and reached a globally terminal state. This is especially important
    when using stand-by JobManagers where a former leader might still be
    connected to ZooKeeper and, thus, keeping all ephemeral nodes/locks.
---
 .../org/apache/flink/runtime/akka/ActorUtils.java  |  10 ++
 .../flink/runtime/jobmanager/JobManager.scala      |  39 +++--
 .../flink/runtime/dispatcher/DispatcherHATest.java |   9 +-
 .../flink/runtime/jobmanager/JobManagerTest.java   |   3 +-
 .../jobmanager/ZooKeeperHAJobManagerTest.java      | 180 +++++++++++++++++++++
 .../testingUtils/TestingJobManagerLike.scala       |   8 +
 .../testingUtils/TestingJobManagerMessages.scala   |   3 +
 7 files changed, 231 insertions(+), 21 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/akka/ActorUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/akka/ActorUtils.java
index f2f9059..9a99281 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/akka/ActorUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/akka/ActorUtils.java
@@ -19,9 +19,11 @@
 package org.apache.flink.runtime.akka;
 
 import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.instance.AkkaActorGateway;
 
 import akka.actor.ActorRef;
 import akka.actor.Kill;
+import akka.actor.PoisonPill;
 import akka.pattern.Patterns;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -85,5 +87,13 @@ public class ActorUtils {
 		return FutureUtils.completeAll(terminationFutures);
 	}
 
+	public static void stopActor(AkkaActorGateway akkaActorGateway) {
+		stopActor(akkaActorGateway.actor());
+	}
+
+	public static void stopActor(ActorRef actorRef) {
+		actorRef.tell(PoisonPill.getInstance(), ActorRef.noSender());
+	}
+
 	private ActorUtils() {}
 }
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 0988730..6d27dc3 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -1729,21 +1729,22 @@ class JobManager(
     val futureOption = currentJobs.remove(jobID) match {
       case Some((eg, _)) =>
         val cleanUpFuture: Future[Unit] = Future {
-          val cleanupHABlobs = if (removeJobFromStateBackend) {
-            try {
+          val cleanupHABlobs = try {
+            if (removeJobFromStateBackend) {
               // ...otherwise, we can have lingering resources when there is a  concurrent shutdown
               // and the ZooKeeper client is closed. Not removing the job immediately allow the
               // shutdown to release all resources.
               submittedJobGraphs.removeJobGraph(jobID)
               true
-            } catch {
-              case t: Throwable => {
-                log.warn(s"Could not remove submitted job graph $jobID.", t)
-                false
-              }
+            } else {
+              submittedJobGraphs.releaseJobGraph(jobID)
+              false
+            }
+          } catch {
+            case t: Throwable => {
+              log.warn(s"Could not remove submitted job graph $jobID.", t)
+              false
             }
-          } else {
-            false
           }
 
           blobServer.cleanupJob(jobID, cleanupHABlobs)
@@ -1778,19 +1779,23 @@ class JobManager(
     */
   private def cancelAndClearEverything(cause: Throwable)
     : Seq[Future[Unit]] = {
-    val futures = for ((jobID, (eg, jobInfo)) <- currentJobs) yield {
-      future {
-        eg.suspend(cause)
-        jobManagerMetricGroup.removeJob(eg.getJobID)
+
+    val futures = currentJobs.values.flatMap(
+      egJobInfo => {
+        val executionGraph = egJobInfo._1
+        val jobInfo = egJobInfo._2
+
+        executionGraph.suspend(cause)
+
+        val jobId = executionGraph.getJobID
 
         jobInfo.notifyNonDetachedClients(
           decorateMessage(
             Failure(
-              new JobExecutionException(jobID, "All jobs are cancelled and cleared.", cause))))
-      }(context.dispatcher)
-    }
+              new JobExecutionException(jobId, "All jobs are cancelled and cleared.", cause))))
 
-    currentJobs.clear()
+        removeJob(jobId, false)
+      })
 
     futures.toSeq
   }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherHATest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherHATest.java
index 5876c5f..adf7618 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherHATest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherHATest.java
@@ -41,6 +41,7 @@ import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.rpc.RpcUtils;
 import org.apache.flink.runtime.rpc.TestingRpcService;
+import org.apache.flink.runtime.testtasks.NoOpInvokable;
 import org.apache.flink.runtime.util.TestingFatalErrorHandler;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.TestLogger;
@@ -162,9 +163,13 @@ public class DispatcherHATest extends TestLogger {
 	}
 
 	@Nonnull
-	private JobGraph createNonEmptyJobGraph() {
+	public static JobGraph createNonEmptyJobGraph() {
 		final JobVertex noOpVertex = new JobVertex("NoOp vertex");
-		return new JobGraph(noOpVertex);
+		noOpVertex.setInvokableClass(NoOpInvokable.class);
+		final JobGraph jobGraph = new JobGraph(noOpVertex);
+		jobGraph.setAllowQueuedScheduling(true);
+
+		return jobGraph;
 	}
 
 	private static class HATestingDispatcher extends TestingDispatcher {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
index 052349c..098f564 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
@@ -32,8 +32,8 @@ import org.apache.flink.runtime.checkpoint.CheckpointDeclineReason;
 import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
-import org.apache.flink.runtime.checkpoint.CheckpointType;
 import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy;
+import org.apache.flink.runtime.checkpoint.CheckpointType;
 import org.apache.flink.runtime.clusterframework.messages.NotifyResourceStarted;
 import org.apache.flink.runtime.clusterframework.messages.RegisterResourceManager;
 import org.apache.flink.runtime.clusterframework.messages.RegisterResourceManagerSuccessful;
@@ -152,7 +152,6 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
-
 import static org.mockito.Mockito.mock;
 
 public class JobManagerTest extends TestLogger {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperHAJobManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperHAJobManagerTest.java
new file mode 100644
index 0000000..8e5b1b9
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperHAJobManagerTest.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.runtime.akka.ActorUtils;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.akka.ListeningBehaviour;
+import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
+import org.apache.flink.runtime.dispatcher.DispatcherHATest;
+import org.apache.flink.runtime.dispatcher.NoOpSubmittedJobGraphListener;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.instance.AkkaActorGateway;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
+import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.metrics.NoOpMetricRegistry;
+import org.apache.flink.runtime.testingUtils.TestingJobManager;
+import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.util.ZooKeeperUtils;
+import org.apache.flink.runtime.zookeeper.ZooKeeperResource;
+import org.apache.flink.util.TestLogger;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.ExtendedActorSystem;
+import akka.actor.Identify;
+import akka.actor.Terminated;
+import akka.pattern.Patterns;
+import org.apache.curator.framework.CuratorFramework;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.util.Collection;
+import java.util.concurrent.TimeUnit;
+
+import scala.Option;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
+
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.not;
+import static org.hamcrest.Matchers.notNullValue;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for the ZooKeeper HA service and {@link JobManager} interaction.
+ */
+public class ZooKeeperHAJobManagerTest extends TestLogger {
+
+	@ClassRule
+	public static final ZooKeeperResource ZOO_KEEPER_RESOURCE = new ZooKeeperResource();
+
+	@ClassRule
+	public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();
+
+	private static final FiniteDuration TIMEOUT = FiniteDuration.apply(10L, TimeUnit.SECONDS);
+
+	private static ActorSystem system;
+
+	@BeforeClass
+	public static void setup() {
+		system = AkkaUtils.createLocalActorSystem(new Configuration());
+	}
+
+	@AfterClass
+	public static void teardown() throws Exception {
+		final Future<Terminated> terminationFuture = system.terminate();
+		Await.ready(terminationFuture, TIMEOUT);
+	}
+
+	/**
+	 * Tests that the {@link JobManager} releases all locked {@link JobGraph} if it loses
+	 * leadership.
+	 */
+	@Test
+	public void testJobGraphReleaseWhenLosingLeadership() throws Exception {
+		final Configuration configuration = new Configuration();
+		configuration.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, ZOO_KEEPER_RESOURCE.getConnectString());
+		configuration.setString(HighAvailabilityOptions.HA_STORAGE_PATH, TEMPORARY_FOLDER.newFolder().getAbsolutePath());
+
+		try (TestingHighAvailabilityServices highAvailabilityServices = new TestingHighAvailabilityServices()) {
+
+			final CuratorFramework client = ZooKeeperUtils.startCuratorFramework(configuration);
+			final TestingLeaderElectionService leaderElectionService = new TestingLeaderElectionService();
+			highAvailabilityServices.setJobMasterLeaderElectionService(HighAvailabilityServices.DEFAULT_JOB_ID, leaderElectionService);
+			highAvailabilityServices.setSubmittedJobGraphStore(ZooKeeperUtils.createSubmittedJobGraphs(client, configuration));
+			highAvailabilityServices.setCheckpointRecoveryFactory(new StandaloneCheckpointRecoveryFactory());
+
+			final CuratorFramework otherClient = ZooKeeperUtils.startCuratorFramework(configuration);
+			final ZooKeeperSubmittedJobGraphStore otherSubmittedJobGraphStore = ZooKeeperUtils.createSubmittedJobGraphs(otherClient, configuration);
+			otherSubmittedJobGraphStore.start(NoOpSubmittedJobGraphListener.INSTANCE);
+
+			ActorRef jobManagerActorRef = null;
+			try {
+				jobManagerActorRef = JobManager.startJobManagerActors(
+					configuration,
+					system,
+					TestingUtils.defaultExecutor(),
+					TestingUtils.defaultExecutor(),
+					highAvailabilityServices,
+					NoOpMetricRegistry.INSTANCE,
+					Option.empty(),
+					TestingJobManager.class,
+					MemoryArchivist.class)._1();
+
+				waitForActorToBeStarted(jobManagerActorRef, TIMEOUT);
+
+				final ActorGateway jobManager = new AkkaActorGateway(jobManagerActorRef, HighAvailabilityServices.DEFAULT_LEADER_ID);
+
+				leaderElectionService.isLeader(HighAvailabilityServices.DEFAULT_LEADER_ID).get();
+
+				final JobGraph nonEmptyJobGraph = DispatcherHATest.createNonEmptyJobGraph();
+
+				final JobManagerMessages.SubmitJob submitJobMessage = new JobManagerMessages.SubmitJob(nonEmptyJobGraph, ListeningBehaviour.DETACHED);
+
+				Await.result(jobManager.ask(submitJobMessage, TIMEOUT), TIMEOUT);
+
+				Collection<JobID> jobIds = otherSubmittedJobGraphStore.getJobIds();
+
+				final JobID jobId = nonEmptyJobGraph.getJobID();
+				assertThat(jobIds, contains(jobId));
+
+				// revoke the leadership
+				leaderElectionService.notLeader();
+
+				Await.result(jobManager.ask(TestingJobManagerMessages.getWaitForBackgroundTasksToFinish(), TIMEOUT), TIMEOUT);
+
+				final SubmittedJobGraph recoveredJobGraph = akka.serialization.JavaSerializer.currentSystem().withValue(
+					((ExtendedActorSystem) system),
+					() -> otherSubmittedJobGraphStore.recoverJobGraph(jobId));
+
+				assertThat(recoveredJobGraph, is(notNullValue()));
+
+				otherSubmittedJobGraphStore.removeJobGraph(jobId);
+
+				jobIds = otherSubmittedJobGraphStore.getJobIds();
+
+				assertThat(jobIds, not(contains(jobId)));
+			} finally {
+				client.close();
+				otherClient.close();
+
+				if (jobManagerActorRef != null) {
+					ActorUtils.stopActor(jobManagerActorRef);
+				}
+			}
+		}
+	}
+
+	private void waitForActorToBeStarted(ActorRef jobManagerActorRef, FiniteDuration timeout) throws InterruptedException, java.util.concurrent.TimeoutException {
+		Await.ready(Patterns.ask(jobManagerActorRef, new Identify(42), timeout.toMillis()), timeout);
+	}
+}
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala
index 0640f39..ebe4639 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala
@@ -454,6 +454,14 @@ trait TestingJobManagerLike extends FlinkActor {
         val receiver = waitForNumRegisteredTaskManagers.dequeue()._2
         receiver ! Acknowledge.get()
       }
+
+    case WaitForBackgroundTasksToFinish =>
+      val future = futuresToComplete match {
+        case Some(futures) => Future.sequence(futures)
+        case None => Future.successful(Seq())
+      }
+
+      future.pipeTo(sender())
   }
 
   def checkIfAllVerticesRunning(jobID: JobID): Boolean = {
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala
index c8529a9..64af056 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala
@@ -59,6 +59,8 @@ object TestingJobManagerMessages {
 
   case object NotifyListeners
 
+  case object WaitForBackgroundTasksToFinish
+
   case class NotifyWhenTaskManagerTerminated(taskManager: ActorRef)
   case class TaskManagerTerminated(taskManager: ActorRef)
 
@@ -164,4 +166,5 @@ object TestingJobManagerMessages {
   def getClientConnected(): AnyRef = ClientConnected
   def getClassLoadingPropsDelivered(): AnyRef = ClassLoadingPropsDelivered
 
+  def getWaitForBackgroundTasksToFinish(): AnyRef = WaitForBackgroundTasksToFinish
 }


[flink] 05/08: [FLINK-10011] Introduce SubmittedJobGraphStore#releaseJobGraph

Posted by tr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit cb69af18c5545b7218ea16d25d0b910f767ccf49
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Mon Aug 20 09:42:28 2018 +0200

    [FLINK-10011] Introduce SubmittedJobGraphStore#releaseJobGraph
    
    SubmitedJobGraphStore#releaseJobGraph removes a potentially existing lock
    from the specified JobGraph. This allows other SubmittedJobGraphStores to
    remove the JobGraph given that it is no longer locked.
---
 .../SingleJobSubmittedJobGraphStore.java           |  9 ++++++--
 .../StandaloneSubmittedJobGraphStore.java          | 13 ++++++++----
 .../runtime/jobmanager/SubmittedJobGraphStore.java | 12 +++++++++++
 .../ZooKeeperSubmittedJobGraphStore.java           | 24 +++++++++++++++++++---
 .../flink/runtime/dispatcher/DispatcherHATest.java |  5 +++++
 .../testutils/InMemorySubmittedJobGraphStore.java  |  5 +++++
 6 files changed, 59 insertions(+), 9 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/SingleJobSubmittedJobGraphStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/SingleJobSubmittedJobGraphStore.java
index 26d3abc..fe7f5f1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/SingleJobSubmittedJobGraphStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/SingleJobSubmittedJobGraphStore.java
@@ -66,12 +66,17 @@ public class SingleJobSubmittedJobGraphStore implements SubmittedJobGraphStore {
 	}
 
 	@Override
-	public void removeJobGraph(JobID jobId) throws Exception {
+	public void removeJobGraph(JobID jobId) {
 		// ignore
 	}
 
 	@Override
-	public Collection<JobID> getJobIds() throws Exception {
+	public void releaseJobGraph(JobID jobId) {
+		// ignore
+	}
+
+	@Override
+	public Collection<JobID> getJobIds() {
 		return Collections.singleton(jobGraph.getJobID());
 	}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/StandaloneSubmittedJobGraphStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/StandaloneSubmittedJobGraphStore.java
index d1ca1a3..f28621f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/StandaloneSubmittedJobGraphStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/StandaloneSubmittedJobGraphStore.java
@@ -43,22 +43,27 @@ public class StandaloneSubmittedJobGraphStore implements SubmittedJobGraphStore
 	}
 
 	@Override
-	public void putJobGraph(SubmittedJobGraph jobGraph) throws Exception {
+	public void putJobGraph(SubmittedJobGraph jobGraph) {
 		// Nothing to do
 	}
 
 	@Override
-	public void removeJobGraph(JobID jobId) throws Exception {
+	public void removeJobGraph(JobID jobId) {
 		// Nothing to do
 	}
 
 	@Override
-	public Collection<JobID> getJobIds() throws Exception {
+	public void releaseJobGraph(JobID jobId) {
+		// nothing to do
+	}
+
+	@Override
+	public Collection<JobID> getJobIds() {
 		return Collections.emptyList();
 	}
 
 	@Override
-	public SubmittedJobGraph recoverJobGraph(JobID jobId) throws Exception {
+	public SubmittedJobGraph recoverJobGraph(JobID jobId) {
 		return null;
 	}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/SubmittedJobGraphStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/SubmittedJobGraphStore.java
index 7e624ec..b40a4a2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/SubmittedJobGraphStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/SubmittedJobGraphStore.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.jobmanager;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.jobgraph.JobGraph;
 
 import javax.annotation.Nullable;
 
@@ -59,6 +60,17 @@ public interface SubmittedJobGraphStore {
 	void removeJobGraph(JobID jobId) throws Exception;
 
 	/**
+	 * Releases the locks on the specified {@link JobGraph}.
+	 *
+	 * Releasing the locks allows that another instance can delete the job from
+	 * the {@link SubmittedJobGraphStore}.
+	 *
+	 * @param jobId specifying the job to release the locks for
+	 * @throws Exception if the locks cannot be released
+	 */
+	void releaseJobGraph(JobID jobId) throws Exception;
+
+	/**
 	 * Get all job ids of submitted job graphs to the submitted job graph store.
 	 *
 	 * @return Collection of submitted job ids
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
index 0510815..2b935af 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
@@ -68,13 +68,13 @@ public class ZooKeeperSubmittedJobGraphStore implements SubmittedJobGraphStore {
 	/** Lock to synchronize with the {@link SubmittedJobGraphListener}. */
 	private final Object cacheLock = new Object();
 
-	/** Client (not a namespace facade) */
+	/** Client (not a namespace facade). */
 	private final CuratorFramework client;
 
 	/** The set of IDs of all added job graphs. */
 	private final Set<JobID> addedJobGraphs = new HashSet<>();
 
-	/** Completed checkpoints in ZooKeeper */
+	/** Completed checkpoints in ZooKeeper. */
 	private final ZooKeeperStateHandleStore<SubmittedJobGraph> jobGraphsInZooKeeper;
 
 	/**
@@ -93,7 +93,7 @@ public class ZooKeeperSubmittedJobGraphStore implements SubmittedJobGraphStore {
 	private boolean isRunning;
 
 	/**
-	 * Submitted job graph store backed by ZooKeeper
+	 * Submitted job graph store backed by ZooKeeper.
 	 *
 	 * @param client ZooKeeper client
 	 * @param currentJobsPath ZooKeeper path for current job graphs
@@ -274,6 +274,24 @@ public class ZooKeeperSubmittedJobGraphStore implements SubmittedJobGraphStore {
 	}
 
 	@Override
+	public void releaseJobGraph(JobID jobId) throws Exception {
+		checkNotNull(jobId, "Job ID");
+		final String path = getPathForJob(jobId);
+
+		LOG.debug("Releasing locks of job graph {} from {}{}.", jobId, zooKeeperFullBasePath, path);
+
+		synchronized (cacheLock) {
+			if (addedJobGraphs.contains(jobId)) {
+				jobGraphsInZooKeeper.release(path);
+
+				addedJobGraphs.remove(jobId);
+			}
+		}
+
+		LOG.info("Released locks of job graph {} from ZooKeeper.", jobId);
+	}
+
+	@Override
 	public Collection<JobID> getJobIds() throws Exception {
 		Collection<String> paths;
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherHATest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherHATest.java
index 2c030d2..5876c5f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherHATest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherHATest.java
@@ -244,6 +244,11 @@ public class DispatcherHATest extends TestLogger {
 		}
 
 		@Override
+		public void releaseJobGraph(JobID jobId) throws Exception {
+			throw new UnsupportedOperationException("Should not be called.");
+		}
+
+		@Override
 		public Collection<JobID> getJobIds() throws Exception {
 			enterGetJobIdsLatch.trigger();
 			proceedGetJobIdsLatch.await();
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/InMemorySubmittedJobGraphStore.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/InMemorySubmittedJobGraphStore.java
index ba0dc80..3b9c578 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/InMemorySubmittedJobGraphStore.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/InMemorySubmittedJobGraphStore.java
@@ -97,6 +97,11 @@ public class InMemorySubmittedJobGraphStore implements SubmittedJobGraphStore {
 	}
 
 	@Override
+	public void releaseJobGraph(JobID jobId) {
+		verifyIsStarted();
+	}
+
+	@Override
 	public synchronized Collection<JobID> getJobIds() throws Exception {
 		verifyIsStarted();