You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/02/28 18:36:48 UTC

[05/11] flink git commit: [FLINK-5501] [runtime] Followups and improvements to RunningJobsRegistry

[FLINK-5501] [runtime] Followups and improvements to RunningJobsRegistry

This commit changes the following:
  - Remove the unsafe 'isJobRunning()' method.
  - Exctract duplicate code into utility functions
  - Simplify the NonHaRegistry by using a map rather than two sets
  - Improve exception handling / error messages for the ZooKeeper-based registry
  - Slight improvement of error handling in the JobManagerRunner
  - Compare enums with '==' (better null-pointer safety)
  - Correct 'expected' and 'actual' parameters in 'assertEquals'
  - Forward tests also to the HDFS file based registry test


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e0dede9f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e0dede9f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e0dede9f

Branch: refs/heads/master
Commit: e0dede9fb0a2ef7560254b6fc40d852ebf16c956
Parents: e7011d7
Author: Stephan Ewen <se...@apache.org>
Authored: Mon Feb 27 21:56:36 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Feb 28 18:59:10 2017 +0100

----------------------------------------------------------------------
 .../configuration/HighAvailabilityOptions.java  |  4 ++
 .../FsNegativeRunningJobsRegistryTest.java      | 40 +++++++----
 .../FsNegativeRunningJobsRegistry.java          | 68 +++++++-----------
 .../highavailability/RunningJobsRegistry.java   | 39 ++++++-----
 .../highavailability/ZookeeperRegistry.java     | 73 ++++++++++----------
 .../highavailability/nonha/NonHaRegistry.java   | 45 ++++--------
 .../runtime/jobmaster/JobManagerRunner.java     | 20 +++---
 .../minicluster/MiniClusterJobDispatcher.java   |  2 +-
 .../resourcemanager/JobLeaderIdService.java     |  4 +-
 .../FsNegativeRunningJobsRegistryTest.java      | 37 +++++-----
 .../highavailability/ZooKeeperRegistryTest.java | 33 ++++-----
 11 files changed, 166 insertions(+), 199 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e0dede9f/flink-core/src/main/java/org/apache/flink/configuration/HighAvailabilityOptions.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/HighAvailabilityOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/HighAvailabilityOptions.java
index 4792eba..b883bc3 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/HighAvailabilityOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/HighAvailabilityOptions.java
@@ -124,6 +124,10 @@ public class HighAvailabilityOptions {
 			.defaultValue(3)
 			.withDeprecatedKeys("recovery.zookeeper.client.max-retry-attempts");
 
+	public static final ConfigOption<String> ZOOKEEPER_RUNNING_JOB_REGISTRY_PATH = 
+			key("high-availability.zookeeper.job.registry")
+			.defaultValue("/running_job_registry/");
+
 	// ------------------------------------------------------------------------
 
 	/** Not intended to be instantiated */

http://git-wip-us.apache.org/repos/asf/flink/blob/e0dede9f/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/FsNegativeRunningJobsRegistryTest.java
----------------------------------------------------------------------
diff --git a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/FsNegativeRunningJobsRegistryTest.java b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/FsNegativeRunningJobsRegistryTest.java
index 40d75e8..bb27b8b 100644
--- a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/FsNegativeRunningJobsRegistryTest.java
+++ b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/FsNegativeRunningJobsRegistryTest.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.highavailability.FsNegativeRunningJobsRegistry;
 
+import org.apache.flink.runtime.highavailability.RunningJobsRegistry.JobSchedulingStatus;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 
@@ -33,9 +34,11 @@ import org.junit.rules.TemporaryFolder;
 
 import java.io.File;
 
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertEquals;
 
+/**
+ * Tests for the {@link FsNegativeRunningJobsRegistry} on HDFS.
+ */
 public class FsNegativeRunningJobsRegistryTest {
 
 	@ClassRule
@@ -83,20 +86,22 @@ public class FsNegativeRunningJobsRegistryTest {
 
 		FsNegativeRunningJobsRegistry registry = new FsNegativeRunningJobsRegistry(workDir);
 
-		// initially, without any call, the job is considered running
-		assertTrue(registry.isJobRunning(jid));
+		// another registry should pick this up
+		FsNegativeRunningJobsRegistry otherRegistry = new FsNegativeRunningJobsRegistry(workDir);
+
+		// initially, without any call, the job is pending
+		assertEquals(JobSchedulingStatus.PENDING, registry.getJobSchedulingStatus(jid));
+		assertEquals(JobSchedulingStatus.PENDING, otherRegistry.getJobSchedulingStatus(jid));
 
-		// repeated setting should not affect the status
+		// after set running, the job is running
 		registry.setJobRunning(jid);
-		assertTrue(registry.isJobRunning(jid));
+		assertEquals(JobSchedulingStatus.RUNNING, registry.getJobSchedulingStatus(jid));
+		assertEquals(JobSchedulingStatus.RUNNING, otherRegistry.getJobSchedulingStatus(jid));
 
 		// set the job to finished and validate
 		registry.setJobFinished(jid);
-		assertFalse(registry.isJobRunning(jid));
-
-		// another registry should pick this up
-		FsNegativeRunningJobsRegistry otherRegistry = new FsNegativeRunningJobsRegistry(workDir);
-		assertFalse(otherRegistry.isJobRunning(jid));
+		assertEquals(JobSchedulingStatus.DONE, registry.getJobSchedulingStatus(jid));
+		assertEquals(JobSchedulingStatus.DONE, otherRegistry.getJobSchedulingStatus(jid));
 	}
 
 	@Test
@@ -108,14 +113,19 @@ public class FsNegativeRunningJobsRegistryTest {
 
 		// set the job to finished and validate
 		registry.setJobFinished(jid);
-		assertFalse(registry.isJobRunning(jid));
+		assertEquals(JobSchedulingStatus.DONE, registry.getJobSchedulingStatus(jid));
 
-		// set the job to back to running and validate
+		// set the job to running does not overwrite the finished status
 		registry.setJobRunning(jid);
-		assertTrue(registry.isJobRunning(jid));
+		assertEquals(JobSchedulingStatus.DONE, registry.getJobSchedulingStatus(jid));
 
 		// another registry should pick this up
 		FsNegativeRunningJobsRegistry otherRegistry = new FsNegativeRunningJobsRegistry(workDir);
-		assertTrue(otherRegistry.isJobRunning(jid));
+		assertEquals(JobSchedulingStatus.DONE, otherRegistry.getJobSchedulingStatus(jid));
+
+		// clear the running and finished marker, it will be pending
+		otherRegistry.clearJob(jid);
+		assertEquals(JobSchedulingStatus.PENDING, registry.getJobSchedulingStatus(jid));
+		assertEquals(JobSchedulingStatus.PENDING, otherRegistry.getJobSchedulingStatus(jid));
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/e0dede9f/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/FsNegativeRunningJobsRegistry.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/FsNegativeRunningJobsRegistry.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/FsNegativeRunningJobsRegistry.java
index 9e92263..cb79a65 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/FsNegativeRunningJobsRegistry.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/FsNegativeRunningJobsRegistry.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.highavailability;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.core.fs.FSDataOutputStream;
 import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.FileSystem.WriteMode;
 import org.apache.flink.core.fs.Path;
 
 import java.io.FileNotFoundException;
@@ -30,19 +31,18 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
  * This {@link RunningJobsRegistry} tracks the status jobs via marker files,
- * marking running jobs via running marker files,
- * marking finished jobs via finished marker files.
+ * marking running jobs viarunning marker files, marking finished jobs via finished marker files.
  * 
  * <p>The general contract is the following:
  * <ul>
  *     <li>Initially, a marker file does not exist (no one created it, yet), which means
- *         the specific job is pending</li>
+ *         the specific job is pending.</li>
  *     <li>The first JobManager that granted leadership calls this service to create the running marker file,
  *         which marks the job as running.</li>
+ *     <li>If a JobManager gains leadership but sees the running marker file,
+ *         it will realize that the job has been scheduled already and needs reconciling.</li>
  *     <li>The JobManager that finishes calls this service to create the marker file,
  *         which marks the job as finished.</li>
- *     <li>If a JobManager gains leadership but see the running marker file,
- *         it will realize that the job has been scheduled and need reconciling.</li>
  *     <li>If a JobManager gains leadership at some point when shutdown is in progress,
  *         it will see the marker file and realize that the job is finished.</li>
  *     <li>The application framework is expected to clean the file once the application
@@ -50,7 +50,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  *         start the job, even if it gains leadership.</li>
  * </ul>
  * 
- * <p>It is especially tailored towards deployment modes like for example
+ * <p>This registry is especially tailored towards deployment modes like for example
  * YARN, where HDFS is available as a persistent file system, and the YARN
  * application's working directories on HDFS are automatically cleaned
  * up after the application completed. 
@@ -99,8 +99,8 @@ public class FsNegativeRunningJobsRegistry implements RunningJobsRegistry {
 		// to be safe, attempt to write to the working directory, to
 		// catch problems early
 		final Path testFile = new Path(workingDirectory, ".registry_test");
-		try (FSDataOutputStream out = fileSystem.create(testFile, false)) {
-			out.write(42);
+		try {
+			createFile(testFile, false);
 		}
 		catch (IOException e) {
 			throw new IOException("Unable to write to working directory: " + workingDirectory, e);
@@ -119,9 +119,7 @@ public class FsNegativeRunningJobsRegistry implements RunningJobsRegistry {
 
 		// create the file
 		// to avoid an exception if the job already exists, set overwrite=true
-		try (FSDataOutputStream out = fileSystem.create(filePath, true)) {
-			out.write(42);
-		}
+		createFile(filePath, true);
 	}
 
 	@Override
@@ -131,25 +129,7 @@ public class FsNegativeRunningJobsRegistry implements RunningJobsRegistry {
 
 		// create the file
 		// to avoid an exception if the job already exists, set overwrite=true
-		try (FSDataOutputStream out = fileSystem.create(filePath, true)) {
-			out.write(42);
-		}
-	}
-
-	@Override
-	public boolean isJobRunning(JobID jobID) throws IOException {
-		checkNotNull(jobID, "jobID");
-
-		// check for the existence of the file
-		try {
-			fileSystem.getFileStatus(createMarkerFilePath(RUNNING_PREFIX, jobID));
-			// file was found --> job is running
-			return true;
-		}
-		catch (FileNotFoundException e) {
-			// file does not exist, job is not running
-			return false;
-		}
+		createFile(filePath, true);
 	}
 
 	@Override
@@ -157,21 +137,16 @@ public class FsNegativeRunningJobsRegistry implements RunningJobsRegistry {
 		checkNotNull(jobID, "jobID");
 
 		// first check for the existence of the complete file
-		try {
-			fileSystem.getFileStatus(createMarkerFilePath(DONE_PREFIX, jobID));
+		if (fileSystem.exists(createMarkerFilePath(DONE_PREFIX, jobID))) {
 			// complete file was found --> job is terminated
 			return JobSchedulingStatus.DONE;
 		}
-		catch (FileNotFoundException e) {
-			// file does not exist, job is running or pending
-		}
 		// check for the existence of the running file
-		try {
-			fileSystem.getFileStatus(createMarkerFilePath(RUNNING_PREFIX, jobID));
+		else if (fileSystem.exists(createMarkerFilePath(RUNNING_PREFIX, jobID))) {
 			// running file was found --> job is terminated
 			return JobSchedulingStatus.RUNNING;
 		}
-		catch (FileNotFoundException e) {
+		else {
 			// file does not exist, job is not scheduled
 			return JobSchedulingStatus.PENDING;
 		}
@@ -181,25 +156,30 @@ public class FsNegativeRunningJobsRegistry implements RunningJobsRegistry {
 	public void clearJob(JobID jobID) throws IOException {
 		checkNotNull(jobID, "jobID");
 		final Path runningFilePath = createMarkerFilePath(RUNNING_PREFIX, jobID);
+		final Path doneFilePath = createMarkerFilePath(DONE_PREFIX, jobID);
 
 		// delete the running marker file, if it exists
 		try {
 			fileSystem.delete(runningFilePath, false);
 		}
-		catch (FileNotFoundException e) {
-		}
-
-		final Path doneFilePath = createMarkerFilePath(DONE_PREFIX, jobID);
+		catch (FileNotFoundException ignored) {}
 
 		// delete the finished marker file, if it exists
 		try {
 			fileSystem.delete(doneFilePath, false);
 		}
-		catch (FileNotFoundException e) {
-		}
+		catch (FileNotFoundException ignored) {}
 	}
 
 	private Path createMarkerFilePath(String prefix, JobID jobId) {
 		return new Path(basePath, prefix + jobId.toString());
 	}
+
+	private void createFile(Path path, boolean overwrite) throws IOException {
+		final WriteMode writeMode = overwrite ? WriteMode.OVERWRITE : WriteMode.NO_OVERWRITE;
+
+		try (FSDataOutputStream out = fileSystem.create(path, writeMode)) {
+			out.write(42);
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/e0dede9f/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/RunningJobsRegistry.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/RunningJobsRegistry.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/RunningJobsRegistry.java
index 020f2ca..43e5ac5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/RunningJobsRegistry.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/RunningJobsRegistry.java
@@ -23,29 +23,40 @@ import org.apache.flink.api.common.JobID;
 import java.io.IOException;
 
 /**
- * This registry tracks if a certain job is running.
+ * A simple registry that tracks if a certain job is pending execution, running, or completed.
  * 
  * <p>This registry is used in highly-available setups with multiple master nodes,
  * to determine whether a new leader should attempt to recover a certain job (because the 
  * job is still running), or whether the job has already finished successfully (in case of a
  * finite job) and the leader has only been granted leadership because the previous leader
  * quit cleanly after the job was finished.
+ * 
+ * <p>In addition, the registry can help to determine whether a newly assigned leader JobManager
+ * should attempt reconciliation with running TaskManagers, or immediately schedule the job from
+ * the latest checkpoint/savepoint. 
  */
 public interface RunningJobsRegistry {
 
-	public enum JobSchedulingStatus {
-		/** Job has not been scheduled */
+	/**
+	 * The scheduling status of a job, as maintained by the {@code RunningJobsRegistry}.
+	 */
+	enum JobSchedulingStatus {
+
+		/** Job has not been scheduled, yet */
 		PENDING,
 
-		/** Job has been scheduled */
+		/** Job has been scheduled and is not yet finished */
 		RUNNING,
 
-		/** Job has been finished */
+		/** Job has been finished, successfully or unsuccessfully */
 		DONE;
 	}
 
+	// ------------------------------------------------------------------------
+
 	/**
-	 * Marks a job as running.
+	 * Marks a job as running. Requesting the job's status via the {@link #getJobSchedulingStatus(JobID)}
+	 * method will return {@link JobSchedulingStatus#RUNNING}.
 	 * 
 	 * @param jobID The id of the job.
 	 *
@@ -55,7 +66,8 @@ public interface RunningJobsRegistry {
 	void setJobRunning(JobID jobID) throws IOException;
 
 	/**
-	 * Marks a job as running.
+	 * Marks a job as completed. Requesting the job's status via the {@link #getJobSchedulingStatus(JobID)}
+	 * method will return {@link JobSchedulingStatus#DONE}.
 	 *
 	 * @param jobID The id of the job.
 	 * 
@@ -65,18 +77,7 @@ public interface RunningJobsRegistry {
 	void setJobFinished(JobID jobID) throws IOException;
 
 	/**
-	 * Checks whether a job is running.
-	 *
-	 * @param jobID The id of the job to check.
-	 * @return True if the job is still running, false otherwise.
-	 * 
-	 * @throws IOException Thrown when the communication with the highly-available storage or registry
-	 *                     failed and could not be retried.
-	 */
-	boolean isJobRunning(JobID jobID) throws IOException;
-
-	/**
-	 * Get the scheduing status of a job.
+	 * Gets the scheduling status of a job.
 	 *
 	 * @param jobID The id of the job to check.
 	 * @return The job scheduling status.

http://git-wip-us.apache.org/repos/asf/flink/blob/e0dede9f/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperRegistry.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperRegistry.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperRegistry.java
index 31a4535..a8be35a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperRegistry.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperRegistry.java
@@ -26,6 +26,7 @@ import org.apache.zookeeper.data.Stat;
 
 import java.io.IOException;
 import java.nio.charset.Charset;
+import java.util.Arrays;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -33,20 +34,17 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  * A zookeeper based registry for running jobs, highly available.
  */
 public class ZookeeperRegistry implements RunningJobsRegistry {
-	
-	private static final String DEFAULT_HA_JOB_REGISTRY_PATH = "/running_job_registry/";
+
+	private static final Charset ENCODING = Charset.forName("utf-8");
 
 	/** The ZooKeeper client to use */
 	private final CuratorFramework client;
 
 	private final String runningJobPath;
 
-	private static final String HA_JOB_REGISTRY_PATH = "high-availability.zookeeper.job.registry";
-
 	public ZookeeperRegistry(final CuratorFramework client, final Configuration configuration) {
-		this.client = client;
-		runningJobPath = configuration.getValue(HighAvailabilityOptions.HA_ZOOKEEPER_ROOT) + 
-			configuration.getString(HA_JOB_REGISTRY_PATH, DEFAULT_HA_JOB_REGISTRY_PATH);
+		this.client = checkNotNull(client, "client");
+		this.runningJobPath = configuration.getString(HighAvailabilityOptions.ZOOKEEPER_RUNNING_JOB_REGISTRY_PATH);
 	}
 
 	@Override
@@ -54,12 +52,10 @@ public class ZookeeperRegistry implements RunningJobsRegistry {
 		checkNotNull(jobID);
 
 		try {
-			String zkPath = runningJobPath + jobID.toString();
-			this.client.newNamespaceAwareEnsurePath(zkPath).ensure(client.getZookeeperClient());
-			this.client.setData().forPath(zkPath, JobSchedulingStatus.RUNNING.name().getBytes(Charset.forName("utf-8")));
+			writeEnumToZooKeeper(jobID, JobSchedulingStatus.RUNNING);
 		}
 		catch (Exception e) {
-			throw new IOException("Set running state to zk fail for job " + jobID.toString(), e);
+			throw new IOException("Failed to set RUNNING state in ZooKeeper for job " + jobID, e);
 		}
 	}
 
@@ -68,44 +64,36 @@ public class ZookeeperRegistry implements RunningJobsRegistry {
 		checkNotNull(jobID);
 
 		try {
-			String zkPath = runningJobPath + jobID.toString();
-			this.client.newNamespaceAwareEnsurePath(zkPath).ensure(client.getZookeeperClient());
-			this.client.setData().forPath(zkPath, JobSchedulingStatus.DONE.name().getBytes(Charset.forName("utf-8")));
+			writeEnumToZooKeeper(jobID, JobSchedulingStatus.DONE);
 		}
 		catch (Exception e) {
-			throw new IOException("Set finished state to zk fail for job " + jobID.toString(), e);
+			throw new IOException("Failed to set DONE state in ZooKeeper for job " + jobID, e);
 		}
 	}
 
 	@Override
-	public boolean isJobRunning(JobID jobID) throws IOException {
+	public JobSchedulingStatus getJobSchedulingStatus(JobID jobID) throws IOException {
 		checkNotNull(jobID);
 
 		try {
-			Stat stat = client.checkExists().forPath(runningJobPath + jobID.toString());
+			final String zkPath = createZkPath(jobID);
+			final Stat stat = client.checkExists().forPath(zkPath);
 			if (stat != null) {
-				byte[] data = client.getData().forPath(runningJobPath + jobID.toString());
-				if (JobSchedulingStatus.RUNNING.name().equals(new String(data))) {
-					return true;
+				// found some data, try to parse it
+				final byte[] data = client.getData().forPath(zkPath);
+				if (data != null) {
+					try {
+						final String name = new String(data, ENCODING);
+						return JobSchedulingStatus.valueOf(name);
+					}
+					catch (IllegalArgumentException e) {
+						throw new IOException("Found corrupt data in ZooKeeper: " + 
+								Arrays.toString(data) + " is no valid job status");
+					}
 				}
 			}
-			return false;
-		}
-		catch (Exception e) {
-			throw new IOException("Get running state from zk fail for job " + jobID.toString(), e);
-		}
-	}
 
-	@Override
-	public JobSchedulingStatus getJobSchedulingStatus(JobID jobID) throws IOException {
-		checkNotNull(jobID);
-
-		try {
-			Stat stat = client.checkExists().forPath(runningJobPath + jobID.toString());
-			if (stat != null) {
-				byte[] data = client.getData().forPath(runningJobPath + jobID.toString());
-				return JobSchedulingStatus.valueOf(new String(data));
-			}
+			// nothing found, yet, must be in status 'PENDING'
 			return JobSchedulingStatus.PENDING;
 		}
 		catch (Exception e) {
@@ -118,13 +106,22 @@ public class ZookeeperRegistry implements RunningJobsRegistry {
 		checkNotNull(jobID);
 
 		try {
-			String zkPath = runningJobPath + jobID.toString();
+			final String zkPath = createZkPath(jobID);
 			this.client.newNamespaceAwareEnsurePath(zkPath).ensure(client.getZookeeperClient());
 			this.client.delete().forPath(zkPath);
 		}
 		catch (Exception e) {
-			throw new IOException("Clear job state from zk fail for " + jobID.toString(), e);
+			throw new IOException("Failed to clear job state from ZooKeeper for job " + jobID, e);
 		}
 	}
 
+	private String createZkPath(JobID jobID) {
+		return runningJobPath + jobID.toString();
+	}
+
+	private void writeEnumToZooKeeper(JobID jobID, JobSchedulingStatus status) throws Exception {
+		final String zkPath = createZkPath(jobID);
+		this.client.newNamespaceAwareEnsurePath(zkPath).ensure(client.getZookeeperClient());
+		this.client.setData().forPath(zkPath, status.name().getBytes(ENCODING));
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/e0dede9f/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/NonHaRegistry.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/NonHaRegistry.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/NonHaRegistry.java
index e331212..ab1ce47 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/NonHaRegistry.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/NonHaRegistry.java
@@ -20,9 +20,8 @@ package org.apache.flink.runtime.highavailability.nonha;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.highavailability.RunningJobsRegistry;
-import org.apache.flink.runtime.highavailability.RunningJobsRegistry.JobSchedulingStatus;
 
-import java.util.HashSet;
+import java.util.HashMap;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -32,18 +31,14 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
 public class NonHaRegistry implements RunningJobsRegistry {
 
 	/** The currently running jobs */
-	private final HashSet<JobID> running = new HashSet<>();
-
-	/** The currently finished jobs */
-	private final HashSet<JobID> finished = new HashSet<>();
+	private final HashMap<JobID, JobSchedulingStatus> jobStatus = new HashMap<>();
 
 	@Override
 	public void setJobRunning(JobID jobID) {
 		checkNotNull(jobID);
 
-		synchronized (running) {
-			running.add(jobID);
-			finished.remove(jobID);
+		synchronized (jobStatus) {
+			jobStatus.put(jobID, JobSchedulingStatus.RUNNING);
 		}
 	}
 
@@ -51,33 +46,18 @@ public class NonHaRegistry implements RunningJobsRegistry {
 	public void setJobFinished(JobID jobID) {
 		checkNotNull(jobID);
 
-		synchronized (running) {
-			running.remove(jobID);
-			finished.add(jobID);
-		}
-	}
-
-	@Override
-	public boolean isJobRunning(JobID jobID) {
-		checkNotNull(jobID);
-
-		synchronized (running) {
-			return running.contains(jobID);
+		synchronized (jobStatus) {
+			jobStatus.put(jobID, JobSchedulingStatus.DONE);
 		}
 	}
 
 	@Override
 	public JobSchedulingStatus getJobSchedulingStatus(JobID jobID) {
 		checkNotNull(jobID);
-
-		synchronized (running) {
-			if (finished.contains(jobID)) {
-				return JobSchedulingStatus.DONE;
-			} else if (running.contains(jobID)) {
-				return JobSchedulingStatus.RUNNING;
-			} else {
-				return JobSchedulingStatus.PENDING;
-			}
+		
+		synchronized (jobStatus) {
+			JobSchedulingStatus status = jobStatus.get(jobID);
+			return status == null ? JobSchedulingStatus.PENDING : status;
 		}
 	}
 
@@ -85,9 +65,8 @@ public class NonHaRegistry implements RunningJobsRegistry {
 	public void clearJob(JobID jobID) {
 		checkNotNull(jobID);
 
-		synchronized (running) {
-			running.remove(jobID);
-			finished.remove(jobID);
+		synchronized (jobStatus) {
+			jobStatus.remove(jobID);
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/e0dede9f/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
index 6bebd90..6e02813 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
@@ -360,20 +360,22 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions, F
 			// it's okay that job manager wait for the operation complete
 			leaderElectionService.confirmLeaderSessionID(leaderSessionID);
 
-			JobSchedulingStatus schedulingStatus = JobSchedulingStatus.PENDING;
+			final JobSchedulingStatus schedulingStatus;
 			try {
 				schedulingStatus = runningJobsRegistry.getJobSchedulingStatus(jobGraph.getJobID());
-				if (schedulingStatus.equals(JobSchedulingStatus.DONE)) {
-					log.info("Granted leader ship but job {} has been finished. ", jobGraph.getJobID());
-					jobFinishedByOther();
-					return;
-				}
-			} catch (Throwable t) {
-				log.error("Could not access status (running/finished) of job {}. ",	jobGraph.getJobID(), t);
+			}
+			catch (Throwable t) {
+				log.error("Could not access status (running/finished) of job {}. ", jobGraph.getJobID(), t);
 				onFatalError(t);
 				return;
 			}
 
+			if (schedulingStatus == JobSchedulingStatus.DONE) {
+				log.info("Granted leader ship but job {} has been finished. ", jobGraph.getJobID());
+				jobFinishedByOther();
+				return;
+			}
+
 			// Double check the leadership after we confirm that, there is a small chance that multiple
 			// job managers schedule the same job after if they try to recover at the same time.
 			// This will eventually be noticed, but can not be ruled out from the beginning.
@@ -382,7 +384,7 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions, F
 					// Now set the running status is after getting leader ship and 
 					// set finished status after job in terminated status.
 					// So if finding the job is running, it means someone has already run the job, need recover.
-					if (schedulingStatus.equals(JobSchedulingStatus.PENDING)) {
+					if (schedulingStatus == JobSchedulingStatus.PENDING) {
 						runningJobsRegistry.setJobRunning(jobGraph.getJobID());
 					}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e0dede9f/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java
index 9178684..dd80ada 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java
@@ -271,7 +271,7 @@ public class MiniClusterJobDispatcher {
 			haServices.getRunningJobsRegistry().clearJob(jobID);
 		}
 		catch (Throwable t) {
-			LOG.warn("Could not clear the job {} at the high-availability services", jobID.toString(), t);
+			LOG.warn("Could not clear job {} at the status registry of the high-availability services", jobID, t);
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e0dede9f/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdService.java
index 6c7e249..7ef39de 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdService.java
@@ -24,6 +24,7 @@ import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.highavailability.RunningJobsRegistry;
+import org.apache.flink.runtime.highavailability.RunningJobsRegistry.JobSchedulingStatus;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.util.ExceptionUtils;
@@ -244,7 +245,8 @@ public class JobLeaderIdService {
 				}
 
 				try {
-					if (runningJobsRegistry.isJobRunning(jobId)) {
+					final JobSchedulingStatus jobStatus = runningJobsRegistry.getJobSchedulingStatus(jobId);
+					if (jobStatus == JobSchedulingStatus.PENDING || jobStatus == JobSchedulingStatus.RUNNING) {
 						if (leaderSessionId == null) {
 							// there is no new leader
 							if (previousJobLeaderId != null) {

http://git-wip-us.apache.org/repos/asf/flink/blob/e0dede9f/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/FsNegativeRunningJobsRegistryTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/FsNegativeRunningJobsRegistryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/FsNegativeRunningJobsRegistryTest.java
index bbafcf0..b0c7778 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/FsNegativeRunningJobsRegistryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/FsNegativeRunningJobsRegistryTest.java
@@ -30,9 +30,10 @@ import org.junit.rules.TemporaryFolder;
 import java.io.File;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
 
+/**
+ * Tests for the {@link FsNegativeRunningJobsRegistry} on a local file system.
+ */
 public class FsNegativeRunningJobsRegistryTest extends TestLogger {
 
 	@Rule
@@ -47,23 +48,22 @@ public class FsNegativeRunningJobsRegistryTest extends TestLogger {
 
 		FsNegativeRunningJobsRegistry registry = new FsNegativeRunningJobsRegistry(new Path(uri));
 
+		// another registry should pick this up
+		FsNegativeRunningJobsRegistry otherRegistry = new FsNegativeRunningJobsRegistry(new Path(uri));
+
 		// initially, without any call, the job is pending
-		assertFalse(registry.isJobRunning(jid));
-		assertEquals(registry.getJobSchedulingStatus(jid), JobSchedulingStatus.PENDING);
+		assertEquals(JobSchedulingStatus.PENDING, registry.getJobSchedulingStatus(jid));
+		assertEquals(JobSchedulingStatus.PENDING, otherRegistry.getJobSchedulingStatus(jid));
 
 		// after set running, the job is running
 		registry.setJobRunning(jid);
-		assertTrue(registry.isJobRunning(jid));
-		assertEquals(registry.getJobSchedulingStatus(jid), JobSchedulingStatus.RUNNING);
+		assertEquals(JobSchedulingStatus.RUNNING, registry.getJobSchedulingStatus(jid));
+		assertEquals(JobSchedulingStatus.RUNNING, otherRegistry.getJobSchedulingStatus(jid));
 
 		// set the job to finished and validate
 		registry.setJobFinished(jid);
-		assertEquals(registry.getJobSchedulingStatus(jid), JobSchedulingStatus.DONE);
-
-		// another registry should pick this up
-		FsNegativeRunningJobsRegistry otherRegistry = new FsNegativeRunningJobsRegistry(new Path(uri));
-		assertTrue(otherRegistry.isJobRunning(jid));
-		assertEquals(otherRegistry.getJobSchedulingStatus(jid), JobSchedulingStatus.DONE);
+		assertEquals(JobSchedulingStatus.DONE, registry.getJobSchedulingStatus(jid));
+		assertEquals(JobSchedulingStatus.DONE, otherRegistry.getJobSchedulingStatus(jid));
 	}
 
 	@Test
@@ -77,22 +77,19 @@ public class FsNegativeRunningJobsRegistryTest extends TestLogger {
 
 		// set the job to finished and validate
 		registry.setJobFinished(jid);
-		assertFalse(registry.isJobRunning(jid));
-		assertEquals(registry.getJobSchedulingStatus(jid), JobSchedulingStatus.DONE);
+		assertEquals(JobSchedulingStatus.DONE, registry.getJobSchedulingStatus(jid));
 
 		// set the job to running does not overwrite the finished status
 		registry.setJobRunning(jid);
-		assertTrue(registry.isJobRunning(jid));
-		assertEquals(registry.getJobSchedulingStatus(jid), JobSchedulingStatus.DONE);
+		assertEquals(JobSchedulingStatus.DONE, registry.getJobSchedulingStatus(jid));
 
 		// another registry should pick this up
 		FsNegativeRunningJobsRegistry otherRegistry = new FsNegativeRunningJobsRegistry(new Path(uri));
-		assertTrue(otherRegistry.isJobRunning(jid));
-		assertEquals(registry.getJobSchedulingStatus(jid), JobSchedulingStatus.DONE);
+		assertEquals(JobSchedulingStatus.DONE, otherRegistry.getJobSchedulingStatus(jid));
 
 		// clear the running and finished marker, it will be pending
 		otherRegistry.clearJob(jid);
-		assertFalse(otherRegistry.isJobRunning(jid));
-		assertEquals(registry.getJobSchedulingStatus(jid), JobSchedulingStatus.PENDING);
+		assertEquals(JobSchedulingStatus.PENDING, registry.getJobSchedulingStatus(jid));
+		assertEquals(JobSchedulingStatus.PENDING, otherRegistry.getJobSchedulingStatus(jid));
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/e0dede9f/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/ZooKeeperRegistryTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/ZooKeeperRegistryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/ZooKeeperRegistryTest.java
index 8c91898..b1881e5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/ZooKeeperRegistryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/ZooKeeperRegistryTest.java
@@ -22,22 +22,21 @@ import org.apache.curator.test.TestingServer;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.runtime.concurrent.Executors;
 import org.apache.flink.runtime.highavailability.RunningJobsRegistry.JobSchedulingStatus;
+import org.apache.flink.runtime.util.ZooKeeperUtils;
 import org.apache.flink.util.TestLogger;
+
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
+
 
 public class ZooKeeperRegistryTest extends TestLogger {
-	private TestingServer testingServer;
 
-	private static Logger LOG = LoggerFactory.getLogger(ZooKeeperRegistryTest.class);
+	private TestingServer testingServer;
 
 	@Before
 	public void before() throws Exception {
@@ -59,29 +58,25 @@ public class ZooKeeperRegistryTest extends TestLogger {
 		configuration.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, testingServer.getConnectString());
 		configuration.setString(HighAvailabilityOptions.HA_MODE, "zookeeper");
 
-		HighAvailabilityServices zkHaService = HighAvailabilityServicesUtils.createAvailableOrEmbeddedServices(configuration);
-		RunningJobsRegistry zkRegistry = zkHaService.getRunningJobsRegistry();
+		final HighAvailabilityServices zkHaService = new ZookeeperHaServices(
+				ZooKeeperUtils.startCuratorFramework(configuration), Executors.directExecutor(), configuration);
+
+		final RunningJobsRegistry zkRegistry = zkHaService.getRunningJobsRegistry();
 
 		try {
 			JobID jobID = JobID.generate();
-			assertFalse(zkRegistry.isJobRunning(jobID));
-			assertEquals(zkRegistry.getJobSchedulingStatus(jobID), JobSchedulingStatus.PENDING);
+			assertEquals(JobSchedulingStatus.PENDING, zkRegistry.getJobSchedulingStatus(jobID));
 
 			zkRegistry.setJobRunning(jobID);
-			assertTrue(zkRegistry.isJobRunning(jobID));
-			assertEquals(zkRegistry.getJobSchedulingStatus(jobID), JobSchedulingStatus.RUNNING);
+			assertEquals(JobSchedulingStatus.RUNNING, zkRegistry.getJobSchedulingStatus(jobID));
 
 			zkRegistry.setJobFinished(jobID);
-			assertEquals(zkRegistry.getJobSchedulingStatus(jobID), JobSchedulingStatus.DONE);
-			assertFalse(zkRegistry.isJobRunning(jobID));
+			assertEquals(JobSchedulingStatus.DONE, zkRegistry.getJobSchedulingStatus(jobID));
 
 			zkRegistry.clearJob(jobID);
-			assertFalse(zkRegistry.isJobRunning(jobID));
-			assertEquals(zkRegistry.getJobSchedulingStatus(jobID), JobSchedulingStatus.PENDING);
+			assertEquals(JobSchedulingStatus.PENDING, zkRegistry.getJobSchedulingStatus(jobID));
 		} finally {
-			if (zkHaService != null) {
-				zkHaService.close();
-			}
+			zkHaService.close();
 		}
 	}
 }