You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/02/28 18:36:48 UTC
[05/11] flink git commit: [FLINK-5501] [runtime] Followups and
improvements to RunningJobsRegistry
[FLINK-5501] [runtime] Followups and improvements to RunningJobsRegistry
This commit changes the following:
- Remove the unsafe 'isJobRunning()' method.
- Exctract duplicate code into utility functions
- Simplify the NonHaRegistry by using a map rather than two sets
- Improve exception handling / error messages for the ZooKeeper-based registry
- Slight improvement of error handling in the JobManagerRunner
- Compare enums with '==' (better null-pointer safety)
- Correct 'expected' and 'actual' parameters in 'assertEquals'
- Forward tests also to the HDFS file based registry test
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e0dede9f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e0dede9f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e0dede9f
Branch: refs/heads/master
Commit: e0dede9fb0a2ef7560254b6fc40d852ebf16c956
Parents: e7011d7
Author: Stephan Ewen <se...@apache.org>
Authored: Mon Feb 27 21:56:36 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Feb 28 18:59:10 2017 +0100
----------------------------------------------------------------------
.../configuration/HighAvailabilityOptions.java | 4 ++
.../FsNegativeRunningJobsRegistryTest.java | 40 +++++++----
.../FsNegativeRunningJobsRegistry.java | 68 +++++++-----------
.../highavailability/RunningJobsRegistry.java | 39 ++++++-----
.../highavailability/ZookeeperRegistry.java | 73 ++++++++++----------
.../highavailability/nonha/NonHaRegistry.java | 45 ++++--------
.../runtime/jobmaster/JobManagerRunner.java | 20 +++---
.../minicluster/MiniClusterJobDispatcher.java | 2 +-
.../resourcemanager/JobLeaderIdService.java | 4 +-
.../FsNegativeRunningJobsRegistryTest.java | 37 +++++-----
.../highavailability/ZooKeeperRegistryTest.java | 33 ++++-----
11 files changed, 166 insertions(+), 199 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/e0dede9f/flink-core/src/main/java/org/apache/flink/configuration/HighAvailabilityOptions.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/HighAvailabilityOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/HighAvailabilityOptions.java
index 4792eba..b883bc3 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/HighAvailabilityOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/HighAvailabilityOptions.java
@@ -124,6 +124,10 @@ public class HighAvailabilityOptions {
.defaultValue(3)
.withDeprecatedKeys("recovery.zookeeper.client.max-retry-attempts");
+ public static final ConfigOption<String> ZOOKEEPER_RUNNING_JOB_REGISTRY_PATH =
+ key("high-availability.zookeeper.job.registry")
+ .defaultValue("/running_job_registry/");
+
// ------------------------------------------------------------------------
/** Not intended to be instantiated */
http://git-wip-us.apache.org/repos/asf/flink/blob/e0dede9f/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/FsNegativeRunningJobsRegistryTest.java
----------------------------------------------------------------------
diff --git a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/FsNegativeRunningJobsRegistryTest.java b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/FsNegativeRunningJobsRegistryTest.java
index 40d75e8..bb27b8b 100644
--- a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/FsNegativeRunningJobsRegistryTest.java
+++ b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/FsNegativeRunningJobsRegistryTest.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.JobID;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.highavailability.FsNegativeRunningJobsRegistry;
+import org.apache.flink.runtime.highavailability.RunningJobsRegistry.JobSchedulingStatus;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
@@ -33,9 +34,11 @@ import org.junit.rules.TemporaryFolder;
import java.io.File;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertEquals;
+/**
+ * Tests for the {@link FsNegativeRunningJobsRegistry} on HDFS.
+ */
public class FsNegativeRunningJobsRegistryTest {
@ClassRule
@@ -83,20 +86,22 @@ public class FsNegativeRunningJobsRegistryTest {
FsNegativeRunningJobsRegistry registry = new FsNegativeRunningJobsRegistry(workDir);
- // initially, without any call, the job is considered running
- assertTrue(registry.isJobRunning(jid));
+ // another registry should pick this up
+ FsNegativeRunningJobsRegistry otherRegistry = new FsNegativeRunningJobsRegistry(workDir);
+
+ // initially, without any call, the job is pending
+ assertEquals(JobSchedulingStatus.PENDING, registry.getJobSchedulingStatus(jid));
+ assertEquals(JobSchedulingStatus.PENDING, otherRegistry.getJobSchedulingStatus(jid));
- // repeated setting should not affect the status
+ // after set running, the job is running
registry.setJobRunning(jid);
- assertTrue(registry.isJobRunning(jid));
+ assertEquals(JobSchedulingStatus.RUNNING, registry.getJobSchedulingStatus(jid));
+ assertEquals(JobSchedulingStatus.RUNNING, otherRegistry.getJobSchedulingStatus(jid));
// set the job to finished and validate
registry.setJobFinished(jid);
- assertFalse(registry.isJobRunning(jid));
-
- // another registry should pick this up
- FsNegativeRunningJobsRegistry otherRegistry = new FsNegativeRunningJobsRegistry(workDir);
- assertFalse(otherRegistry.isJobRunning(jid));
+ assertEquals(JobSchedulingStatus.DONE, registry.getJobSchedulingStatus(jid));
+ assertEquals(JobSchedulingStatus.DONE, otherRegistry.getJobSchedulingStatus(jid));
}
@Test
@@ -108,14 +113,19 @@ public class FsNegativeRunningJobsRegistryTest {
// set the job to finished and validate
registry.setJobFinished(jid);
- assertFalse(registry.isJobRunning(jid));
+ assertEquals(JobSchedulingStatus.DONE, registry.getJobSchedulingStatus(jid));
- // set the job to back to running and validate
+ // set the job to running does not overwrite the finished status
registry.setJobRunning(jid);
- assertTrue(registry.isJobRunning(jid));
+ assertEquals(JobSchedulingStatus.DONE, registry.getJobSchedulingStatus(jid));
// another registry should pick this up
FsNegativeRunningJobsRegistry otherRegistry = new FsNegativeRunningJobsRegistry(workDir);
- assertTrue(otherRegistry.isJobRunning(jid));
+ assertEquals(JobSchedulingStatus.DONE, otherRegistry.getJobSchedulingStatus(jid));
+
+ // clear the running and finished marker, it will be pending
+ otherRegistry.clearJob(jid);
+ assertEquals(JobSchedulingStatus.PENDING, registry.getJobSchedulingStatus(jid));
+ assertEquals(JobSchedulingStatus.PENDING, otherRegistry.getJobSchedulingStatus(jid));
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/e0dede9f/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/FsNegativeRunningJobsRegistry.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/FsNegativeRunningJobsRegistry.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/FsNegativeRunningJobsRegistry.java
index 9e92263..cb79a65 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/FsNegativeRunningJobsRegistry.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/FsNegativeRunningJobsRegistry.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.highavailability;
import org.apache.flink.api.common.JobID;
import org.apache.flink.core.fs.FSDataOutputStream;
import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.FileSystem.WriteMode;
import org.apache.flink.core.fs.Path;
import java.io.FileNotFoundException;
@@ -30,19 +31,18 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
/**
* This {@link RunningJobsRegistry} tracks the status jobs via marker files,
- * marking running jobs via running marker files,
- * marking finished jobs via finished marker files.
+ * marking running jobs viarunning marker files, marking finished jobs via finished marker files.
*
* <p>The general contract is the following:
* <ul>
* <li>Initially, a marker file does not exist (no one created it, yet), which means
- * the specific job is pending</li>
+ * the specific job is pending.</li>
* <li>The first JobManager that granted leadership calls this service to create the running marker file,
* which marks the job as running.</li>
+ * <li>If a JobManager gains leadership but sees the running marker file,
+ * it will realize that the job has been scheduled already and needs reconciling.</li>
* <li>The JobManager that finishes calls this service to create the marker file,
* which marks the job as finished.</li>
- * <li>If a JobManager gains leadership but see the running marker file,
- * it will realize that the job has been scheduled and need reconciling.</li>
* <li>If a JobManager gains leadership at some point when shutdown is in progress,
* it will see the marker file and realize that the job is finished.</li>
* <li>The application framework is expected to clean the file once the application
@@ -50,7 +50,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
* start the job, even if it gains leadership.</li>
* </ul>
*
- * <p>It is especially tailored towards deployment modes like for example
+ * <p>This registry is especially tailored towards deployment modes like for example
* YARN, where HDFS is available as a persistent file system, and the YARN
* application's working directories on HDFS are automatically cleaned
* up after the application completed.
@@ -99,8 +99,8 @@ public class FsNegativeRunningJobsRegistry implements RunningJobsRegistry {
// to be safe, attempt to write to the working directory, to
// catch problems early
final Path testFile = new Path(workingDirectory, ".registry_test");
- try (FSDataOutputStream out = fileSystem.create(testFile, false)) {
- out.write(42);
+ try {
+ createFile(testFile, false);
}
catch (IOException e) {
throw new IOException("Unable to write to working directory: " + workingDirectory, e);
@@ -119,9 +119,7 @@ public class FsNegativeRunningJobsRegistry implements RunningJobsRegistry {
// create the file
// to avoid an exception if the job already exists, set overwrite=true
- try (FSDataOutputStream out = fileSystem.create(filePath, true)) {
- out.write(42);
- }
+ createFile(filePath, true);
}
@Override
@@ -131,25 +129,7 @@ public class FsNegativeRunningJobsRegistry implements RunningJobsRegistry {
// create the file
// to avoid an exception if the job already exists, set overwrite=true
- try (FSDataOutputStream out = fileSystem.create(filePath, true)) {
- out.write(42);
- }
- }
-
- @Override
- public boolean isJobRunning(JobID jobID) throws IOException {
- checkNotNull(jobID, "jobID");
-
- // check for the existence of the file
- try {
- fileSystem.getFileStatus(createMarkerFilePath(RUNNING_PREFIX, jobID));
- // file was found --> job is running
- return true;
- }
- catch (FileNotFoundException e) {
- // file does not exist, job is not running
- return false;
- }
+ createFile(filePath, true);
}
@Override
@@ -157,21 +137,16 @@ public class FsNegativeRunningJobsRegistry implements RunningJobsRegistry {
checkNotNull(jobID, "jobID");
// first check for the existence of the complete file
- try {
- fileSystem.getFileStatus(createMarkerFilePath(DONE_PREFIX, jobID));
+ if (fileSystem.exists(createMarkerFilePath(DONE_PREFIX, jobID))) {
// complete file was found --> job is terminated
return JobSchedulingStatus.DONE;
}
- catch (FileNotFoundException e) {
- // file does not exist, job is running or pending
- }
// check for the existence of the running file
- try {
- fileSystem.getFileStatus(createMarkerFilePath(RUNNING_PREFIX, jobID));
+ else if (fileSystem.exists(createMarkerFilePath(RUNNING_PREFIX, jobID))) {
// running file was found --> job is terminated
return JobSchedulingStatus.RUNNING;
}
- catch (FileNotFoundException e) {
+ else {
// file does not exist, job is not scheduled
return JobSchedulingStatus.PENDING;
}
@@ -181,25 +156,30 @@ public class FsNegativeRunningJobsRegistry implements RunningJobsRegistry {
public void clearJob(JobID jobID) throws IOException {
checkNotNull(jobID, "jobID");
final Path runningFilePath = createMarkerFilePath(RUNNING_PREFIX, jobID);
+ final Path doneFilePath = createMarkerFilePath(DONE_PREFIX, jobID);
// delete the running marker file, if it exists
try {
fileSystem.delete(runningFilePath, false);
}
- catch (FileNotFoundException e) {
- }
-
- final Path doneFilePath = createMarkerFilePath(DONE_PREFIX, jobID);
+ catch (FileNotFoundException ignored) {}
// delete the finished marker file, if it exists
try {
fileSystem.delete(doneFilePath, false);
}
- catch (FileNotFoundException e) {
- }
+ catch (FileNotFoundException ignored) {}
}
private Path createMarkerFilePath(String prefix, JobID jobId) {
return new Path(basePath, prefix + jobId.toString());
}
+
+ private void createFile(Path path, boolean overwrite) throws IOException {
+ final WriteMode writeMode = overwrite ? WriteMode.OVERWRITE : WriteMode.NO_OVERWRITE;
+
+ try (FSDataOutputStream out = fileSystem.create(path, writeMode)) {
+ out.write(42);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/e0dede9f/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/RunningJobsRegistry.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/RunningJobsRegistry.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/RunningJobsRegistry.java
index 020f2ca..43e5ac5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/RunningJobsRegistry.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/RunningJobsRegistry.java
@@ -23,29 +23,40 @@ import org.apache.flink.api.common.JobID;
import java.io.IOException;
/**
- * This registry tracks if a certain job is running.
+ * A simple registry that tracks if a certain job is pending execution, running, or completed.
*
* <p>This registry is used in highly-available setups with multiple master nodes,
* to determine whether a new leader should attempt to recover a certain job (because the
* job is still running), or whether the job has already finished successfully (in case of a
* finite job) and the leader has only been granted leadership because the previous leader
* quit cleanly after the job was finished.
+ *
+ * <p>In addition, the registry can help to determine whether a newly assigned leader JobManager
+ * should attempt reconciliation with running TaskManagers, or immediately schedule the job from
+ * the latest checkpoint/savepoint.
*/
public interface RunningJobsRegistry {
- public enum JobSchedulingStatus {
- /** Job has not been scheduled */
+ /**
+ * The scheduling status of a job, as maintained by the {@code RunningJobsRegistry}.
+ */
+ enum JobSchedulingStatus {
+
+ /** Job has not been scheduled, yet */
PENDING,
- /** Job has been scheduled */
+ /** Job has been scheduled and is not yet finished */
RUNNING,
- /** Job has been finished */
+ /** Job has been finished, successfully or unsuccessfully */
DONE;
}
+ // ------------------------------------------------------------------------
+
/**
- * Marks a job as running.
+ * Marks a job as running. Requesting the job's status via the {@link #getJobSchedulingStatus(JobID)}
+ * method will return {@link JobSchedulingStatus#RUNNING}.
*
* @param jobID The id of the job.
*
@@ -55,7 +66,8 @@ public interface RunningJobsRegistry {
void setJobRunning(JobID jobID) throws IOException;
/**
- * Marks a job as running.
+ * Marks a job as completed. Requesting the job's status via the {@link #getJobSchedulingStatus(JobID)}
+ * method will return {@link JobSchedulingStatus#DONE}.
*
* @param jobID The id of the job.
*
@@ -65,18 +77,7 @@ public interface RunningJobsRegistry {
void setJobFinished(JobID jobID) throws IOException;
/**
- * Checks whether a job is running.
- *
- * @param jobID The id of the job to check.
- * @return True if the job is still running, false otherwise.
- *
- * @throws IOException Thrown when the communication with the highly-available storage or registry
- * failed and could not be retried.
- */
- boolean isJobRunning(JobID jobID) throws IOException;
-
- /**
- * Get the scheduing status of a job.
+ * Gets the scheduling status of a job.
*
* @param jobID The id of the job to check.
* @return The job scheduling status.
http://git-wip-us.apache.org/repos/asf/flink/blob/e0dede9f/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperRegistry.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperRegistry.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperRegistry.java
index 31a4535..a8be35a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperRegistry.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperRegistry.java
@@ -26,6 +26,7 @@ import org.apache.zookeeper.data.Stat;
import java.io.IOException;
import java.nio.charset.Charset;
+import java.util.Arrays;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -33,20 +34,17 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
* A zookeeper based registry for running jobs, highly available.
*/
public class ZookeeperRegistry implements RunningJobsRegistry {
-
- private static final String DEFAULT_HA_JOB_REGISTRY_PATH = "/running_job_registry/";
+
+ private static final Charset ENCODING = Charset.forName("utf-8");
/** The ZooKeeper client to use */
private final CuratorFramework client;
private final String runningJobPath;
- private static final String HA_JOB_REGISTRY_PATH = "high-availability.zookeeper.job.registry";
-
public ZookeeperRegistry(final CuratorFramework client, final Configuration configuration) {
- this.client = client;
- runningJobPath = configuration.getValue(HighAvailabilityOptions.HA_ZOOKEEPER_ROOT) +
- configuration.getString(HA_JOB_REGISTRY_PATH, DEFAULT_HA_JOB_REGISTRY_PATH);
+ this.client = checkNotNull(client, "client");
+ this.runningJobPath = configuration.getString(HighAvailabilityOptions.ZOOKEEPER_RUNNING_JOB_REGISTRY_PATH);
}
@Override
@@ -54,12 +52,10 @@ public class ZookeeperRegistry implements RunningJobsRegistry {
checkNotNull(jobID);
try {
- String zkPath = runningJobPath + jobID.toString();
- this.client.newNamespaceAwareEnsurePath(zkPath).ensure(client.getZookeeperClient());
- this.client.setData().forPath(zkPath, JobSchedulingStatus.RUNNING.name().getBytes(Charset.forName("utf-8")));
+ writeEnumToZooKeeper(jobID, JobSchedulingStatus.RUNNING);
}
catch (Exception e) {
- throw new IOException("Set running state to zk fail for job " + jobID.toString(), e);
+ throw new IOException("Failed to set RUNNING state in ZooKeeper for job " + jobID, e);
}
}
@@ -68,44 +64,36 @@ public class ZookeeperRegistry implements RunningJobsRegistry {
checkNotNull(jobID);
try {
- String zkPath = runningJobPath + jobID.toString();
- this.client.newNamespaceAwareEnsurePath(zkPath).ensure(client.getZookeeperClient());
- this.client.setData().forPath(zkPath, JobSchedulingStatus.DONE.name().getBytes(Charset.forName("utf-8")));
+ writeEnumToZooKeeper(jobID, JobSchedulingStatus.DONE);
}
catch (Exception e) {
- throw new IOException("Set finished state to zk fail for job " + jobID.toString(), e);
+ throw new IOException("Failed to set DONE state in ZooKeeper for job " + jobID, e);
}
}
@Override
- public boolean isJobRunning(JobID jobID) throws IOException {
+ public JobSchedulingStatus getJobSchedulingStatus(JobID jobID) throws IOException {
checkNotNull(jobID);
try {
- Stat stat = client.checkExists().forPath(runningJobPath + jobID.toString());
+ final String zkPath = createZkPath(jobID);
+ final Stat stat = client.checkExists().forPath(zkPath);
if (stat != null) {
- byte[] data = client.getData().forPath(runningJobPath + jobID.toString());
- if (JobSchedulingStatus.RUNNING.name().equals(new String(data))) {
- return true;
+ // found some data, try to parse it
+ final byte[] data = client.getData().forPath(zkPath);
+ if (data != null) {
+ try {
+ final String name = new String(data, ENCODING);
+ return JobSchedulingStatus.valueOf(name);
+ }
+ catch (IllegalArgumentException e) {
+ throw new IOException("Found corrupt data in ZooKeeper: " +
+ Arrays.toString(data) + " is no valid job status");
+ }
}
}
- return false;
- }
- catch (Exception e) {
- throw new IOException("Get running state from zk fail for job " + jobID.toString(), e);
- }
- }
- @Override
- public JobSchedulingStatus getJobSchedulingStatus(JobID jobID) throws IOException {
- checkNotNull(jobID);
-
- try {
- Stat stat = client.checkExists().forPath(runningJobPath + jobID.toString());
- if (stat != null) {
- byte[] data = client.getData().forPath(runningJobPath + jobID.toString());
- return JobSchedulingStatus.valueOf(new String(data));
- }
+ // nothing found, yet, must be in status 'PENDING'
return JobSchedulingStatus.PENDING;
}
catch (Exception e) {
@@ -118,13 +106,22 @@ public class ZookeeperRegistry implements RunningJobsRegistry {
checkNotNull(jobID);
try {
- String zkPath = runningJobPath + jobID.toString();
+ final String zkPath = createZkPath(jobID);
this.client.newNamespaceAwareEnsurePath(zkPath).ensure(client.getZookeeperClient());
this.client.delete().forPath(zkPath);
}
catch (Exception e) {
- throw new IOException("Clear job state from zk fail for " + jobID.toString(), e);
+ throw new IOException("Failed to clear job state from ZooKeeper for job " + jobID, e);
}
}
+ private String createZkPath(JobID jobID) {
+ return runningJobPath + jobID.toString();
+ }
+
+ private void writeEnumToZooKeeper(JobID jobID, JobSchedulingStatus status) throws Exception {
+ final String zkPath = createZkPath(jobID);
+ this.client.newNamespaceAwareEnsurePath(zkPath).ensure(client.getZookeeperClient());
+ this.client.setData().forPath(zkPath, status.name().getBytes(ENCODING));
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/e0dede9f/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/NonHaRegistry.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/NonHaRegistry.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/NonHaRegistry.java
index e331212..ab1ce47 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/NonHaRegistry.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/NonHaRegistry.java
@@ -20,9 +20,8 @@ package org.apache.flink.runtime.highavailability.nonha;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.highavailability.RunningJobsRegistry;
-import org.apache.flink.runtime.highavailability.RunningJobsRegistry.JobSchedulingStatus;
-import java.util.HashSet;
+import java.util.HashMap;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -32,18 +31,14 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
public class NonHaRegistry implements RunningJobsRegistry {
/** The currently running jobs */
- private final HashSet<JobID> running = new HashSet<>();
-
- /** The currently finished jobs */
- private final HashSet<JobID> finished = new HashSet<>();
+ private final HashMap<JobID, JobSchedulingStatus> jobStatus = new HashMap<>();
@Override
public void setJobRunning(JobID jobID) {
checkNotNull(jobID);
- synchronized (running) {
- running.add(jobID);
- finished.remove(jobID);
+ synchronized (jobStatus) {
+ jobStatus.put(jobID, JobSchedulingStatus.RUNNING);
}
}
@@ -51,33 +46,18 @@ public class NonHaRegistry implements RunningJobsRegistry {
public void setJobFinished(JobID jobID) {
checkNotNull(jobID);
- synchronized (running) {
- running.remove(jobID);
- finished.add(jobID);
- }
- }
-
- @Override
- public boolean isJobRunning(JobID jobID) {
- checkNotNull(jobID);
-
- synchronized (running) {
- return running.contains(jobID);
+ synchronized (jobStatus) {
+ jobStatus.put(jobID, JobSchedulingStatus.DONE);
}
}
@Override
public JobSchedulingStatus getJobSchedulingStatus(JobID jobID) {
checkNotNull(jobID);
-
- synchronized (running) {
- if (finished.contains(jobID)) {
- return JobSchedulingStatus.DONE;
- } else if (running.contains(jobID)) {
- return JobSchedulingStatus.RUNNING;
- } else {
- return JobSchedulingStatus.PENDING;
- }
+
+ synchronized (jobStatus) {
+ JobSchedulingStatus status = jobStatus.get(jobID);
+ return status == null ? JobSchedulingStatus.PENDING : status;
}
}
@@ -85,9 +65,8 @@ public class NonHaRegistry implements RunningJobsRegistry {
public void clearJob(JobID jobID) {
checkNotNull(jobID);
- synchronized (running) {
- running.remove(jobID);
- finished.remove(jobID);
+ synchronized (jobStatus) {
+ jobStatus.remove(jobID);
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/e0dede9f/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
index 6bebd90..6e02813 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
@@ -360,20 +360,22 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions, F
// it's okay that job manager wait for the operation complete
leaderElectionService.confirmLeaderSessionID(leaderSessionID);
- JobSchedulingStatus schedulingStatus = JobSchedulingStatus.PENDING;
+ final JobSchedulingStatus schedulingStatus;
try {
schedulingStatus = runningJobsRegistry.getJobSchedulingStatus(jobGraph.getJobID());
- if (schedulingStatus.equals(JobSchedulingStatus.DONE)) {
- log.info("Granted leader ship but job {} has been finished. ", jobGraph.getJobID());
- jobFinishedByOther();
- return;
- }
- } catch (Throwable t) {
- log.error("Could not access status (running/finished) of job {}. ", jobGraph.getJobID(), t);
+ }
+ catch (Throwable t) {
+ log.error("Could not access status (running/finished) of job {}. ", jobGraph.getJobID(), t);
onFatalError(t);
return;
}
+ if (schedulingStatus == JobSchedulingStatus.DONE) {
+ log.info("Granted leader ship but job {} has been finished. ", jobGraph.getJobID());
+ jobFinishedByOther();
+ return;
+ }
+
// Double check the leadership after we confirm that, there is a small chance that multiple
// job managers schedule the same job after if they try to recover at the same time.
// This will eventually be noticed, but can not be ruled out from the beginning.
@@ -382,7 +384,7 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions, F
// Now set the running status is after getting leader ship and
// set finished status after job in terminated status.
// So if finding the job is running, it means someone has already run the job, need recover.
- if (schedulingStatus.equals(JobSchedulingStatus.PENDING)) {
+ if (schedulingStatus == JobSchedulingStatus.PENDING) {
runningJobsRegistry.setJobRunning(jobGraph.getJobID());
}
http://git-wip-us.apache.org/repos/asf/flink/blob/e0dede9f/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java
index 9178684..dd80ada 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java
@@ -271,7 +271,7 @@ public class MiniClusterJobDispatcher {
haServices.getRunningJobsRegistry().clearJob(jobID);
}
catch (Throwable t) {
- LOG.warn("Could not clear the job {} at the high-availability services", jobID.toString(), t);
+ LOG.warn("Could not clear job {} at the status registry of the high-availability services", jobID, t);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/e0dede9f/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdService.java
index 6c7e249..7ef39de 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdService.java
@@ -24,6 +24,7 @@ import org.apache.flink.runtime.concurrent.Future;
import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.RunningJobsRegistry;
+import org.apache.flink.runtime.highavailability.RunningJobsRegistry.JobSchedulingStatus;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.util.ExceptionUtils;
@@ -244,7 +245,8 @@ public class JobLeaderIdService {
}
try {
- if (runningJobsRegistry.isJobRunning(jobId)) {
+ final JobSchedulingStatus jobStatus = runningJobsRegistry.getJobSchedulingStatus(jobId);
+ if (jobStatus == JobSchedulingStatus.PENDING || jobStatus == JobSchedulingStatus.RUNNING) {
if (leaderSessionId == null) {
// there is no new leader
if (previousJobLeaderId != null) {
http://git-wip-us.apache.org/repos/asf/flink/blob/e0dede9f/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/FsNegativeRunningJobsRegistryTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/FsNegativeRunningJobsRegistryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/FsNegativeRunningJobsRegistryTest.java
index bbafcf0..b0c7778 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/FsNegativeRunningJobsRegistryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/FsNegativeRunningJobsRegistryTest.java
@@ -30,9 +30,10 @@ import org.junit.rules.TemporaryFolder;
import java.io.File;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
+/**
+ * Tests for the {@link FsNegativeRunningJobsRegistry} on a local file system.
+ */
public class FsNegativeRunningJobsRegistryTest extends TestLogger {
@Rule
@@ -47,23 +48,22 @@ public class FsNegativeRunningJobsRegistryTest extends TestLogger {
FsNegativeRunningJobsRegistry registry = new FsNegativeRunningJobsRegistry(new Path(uri));
+ // another registry should pick this up
+ FsNegativeRunningJobsRegistry otherRegistry = new FsNegativeRunningJobsRegistry(new Path(uri));
+
// initially, without any call, the job is pending
- assertFalse(registry.isJobRunning(jid));
- assertEquals(registry.getJobSchedulingStatus(jid), JobSchedulingStatus.PENDING);
+ assertEquals(JobSchedulingStatus.PENDING, registry.getJobSchedulingStatus(jid));
+ assertEquals(JobSchedulingStatus.PENDING, otherRegistry.getJobSchedulingStatus(jid));
// after set running, the job is running
registry.setJobRunning(jid);
- assertTrue(registry.isJobRunning(jid));
- assertEquals(registry.getJobSchedulingStatus(jid), JobSchedulingStatus.RUNNING);
+ assertEquals(JobSchedulingStatus.RUNNING, registry.getJobSchedulingStatus(jid));
+ assertEquals(JobSchedulingStatus.RUNNING, otherRegistry.getJobSchedulingStatus(jid));
// set the job to finished and validate
registry.setJobFinished(jid);
- assertEquals(registry.getJobSchedulingStatus(jid), JobSchedulingStatus.DONE);
-
- // another registry should pick this up
- FsNegativeRunningJobsRegistry otherRegistry = new FsNegativeRunningJobsRegistry(new Path(uri));
- assertTrue(otherRegistry.isJobRunning(jid));
- assertEquals(otherRegistry.getJobSchedulingStatus(jid), JobSchedulingStatus.DONE);
+ assertEquals(JobSchedulingStatus.DONE, registry.getJobSchedulingStatus(jid));
+ assertEquals(JobSchedulingStatus.DONE, otherRegistry.getJobSchedulingStatus(jid));
}
@Test
@@ -77,22 +77,19 @@ public class FsNegativeRunningJobsRegistryTest extends TestLogger {
// set the job to finished and validate
registry.setJobFinished(jid);
- assertFalse(registry.isJobRunning(jid));
- assertEquals(registry.getJobSchedulingStatus(jid), JobSchedulingStatus.DONE);
+ assertEquals(JobSchedulingStatus.DONE, registry.getJobSchedulingStatus(jid));
// set the job to running does not overwrite the finished status
registry.setJobRunning(jid);
- assertTrue(registry.isJobRunning(jid));
- assertEquals(registry.getJobSchedulingStatus(jid), JobSchedulingStatus.DONE);
+ assertEquals(JobSchedulingStatus.DONE, registry.getJobSchedulingStatus(jid));
// another registry should pick this up
FsNegativeRunningJobsRegistry otherRegistry = new FsNegativeRunningJobsRegistry(new Path(uri));
- assertTrue(otherRegistry.isJobRunning(jid));
- assertEquals(registry.getJobSchedulingStatus(jid), JobSchedulingStatus.DONE);
+ assertEquals(JobSchedulingStatus.DONE, otherRegistry.getJobSchedulingStatus(jid));
// clear the running and finished marker, it will be pending
otherRegistry.clearJob(jid);
- assertFalse(otherRegistry.isJobRunning(jid));
- assertEquals(registry.getJobSchedulingStatus(jid), JobSchedulingStatus.PENDING);
+ assertEquals(JobSchedulingStatus.PENDING, registry.getJobSchedulingStatus(jid));
+ assertEquals(JobSchedulingStatus.PENDING, otherRegistry.getJobSchedulingStatus(jid));
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/e0dede9f/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/ZooKeeperRegistryTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/ZooKeeperRegistryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/ZooKeeperRegistryTest.java
index 8c91898..b1881e5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/ZooKeeperRegistryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/ZooKeeperRegistryTest.java
@@ -22,22 +22,21 @@ import org.apache.curator.test.TestingServer;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.runtime.concurrent.Executors;
import org.apache.flink.runtime.highavailability.RunningJobsRegistry.JobSchedulingStatus;
+import org.apache.flink.runtime.util.ZooKeeperUtils;
import org.apache.flink.util.TestLogger;
+
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
+
public class ZooKeeperRegistryTest extends TestLogger {
- private TestingServer testingServer;
- private static Logger LOG = LoggerFactory.getLogger(ZooKeeperRegistryTest.class);
+ private TestingServer testingServer;
@Before
public void before() throws Exception {
@@ -59,29 +58,25 @@ public class ZooKeeperRegistryTest extends TestLogger {
configuration.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, testingServer.getConnectString());
configuration.setString(HighAvailabilityOptions.HA_MODE, "zookeeper");
- HighAvailabilityServices zkHaService = HighAvailabilityServicesUtils.createAvailableOrEmbeddedServices(configuration);
- RunningJobsRegistry zkRegistry = zkHaService.getRunningJobsRegistry();
+ final HighAvailabilityServices zkHaService = new ZookeeperHaServices(
+ ZooKeeperUtils.startCuratorFramework(configuration), Executors.directExecutor(), configuration);
+
+ final RunningJobsRegistry zkRegistry = zkHaService.getRunningJobsRegistry();
try {
JobID jobID = JobID.generate();
- assertFalse(zkRegistry.isJobRunning(jobID));
- assertEquals(zkRegistry.getJobSchedulingStatus(jobID), JobSchedulingStatus.PENDING);
+ assertEquals(JobSchedulingStatus.PENDING, zkRegistry.getJobSchedulingStatus(jobID));
zkRegistry.setJobRunning(jobID);
- assertTrue(zkRegistry.isJobRunning(jobID));
- assertEquals(zkRegistry.getJobSchedulingStatus(jobID), JobSchedulingStatus.RUNNING);
+ assertEquals(JobSchedulingStatus.RUNNING, zkRegistry.getJobSchedulingStatus(jobID));
zkRegistry.setJobFinished(jobID);
- assertEquals(zkRegistry.getJobSchedulingStatus(jobID), JobSchedulingStatus.DONE);
- assertFalse(zkRegistry.isJobRunning(jobID));
+ assertEquals(JobSchedulingStatus.DONE, zkRegistry.getJobSchedulingStatus(jobID));
zkRegistry.clearJob(jobID);
- assertFalse(zkRegistry.isJobRunning(jobID));
- assertEquals(zkRegistry.getJobSchedulingStatus(jobID), JobSchedulingStatus.PENDING);
+ assertEquals(JobSchedulingStatus.PENDING, zkRegistry.getJobSchedulingStatus(jobID));
} finally {
- if (zkHaService != null) {
- zkHaService.close();
- }
+ zkHaService.close();
}
}
}