You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/05/24 14:49:46 UTC
[2/4] flink git commit: [FLINK-9421] Remove job from
RunningJobsRegistry when it reaches a terminal state
[FLINK-9421] Remove job from RunningJobsRegistry when it reaches a terminal state
This commit lets the Dispatcher remove the RunningJobsRegistry entry for a completed job
when it is removed from the Dispatcher.
This closes #6068.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5ad0ca23
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5ad0ca23
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5ad0ca23
Branch: refs/heads/master
Commit: 5ad0ca2392a9672a92756337243834d1b466a24d
Parents: 47dc699
Author: Till Rohrmann <tr...@apache.org>
Authored: Wed May 23 23:48:38 2018 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Thu May 24 16:45:15 2018 +0200
----------------------------------------------------------------------
.../flink/runtime/dispatcher/Dispatcher.java | 6 ++
.../highavailability/RunningJobsRegistry.java | 24 +++---
.../StandaloneRunningJobsRegistry.java | 4 +-
.../zookeeper/ZooKeeperRunningJobsRegistry.java | 7 +-
.../DispatcherResourceCleanupTest.java | 88 +++++++++++++++++++-
.../TestingHighAvailabilityServices.java | 5 +-
6 files changed, 115 insertions(+), 19 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/5ad0ca23/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
index 5022388..f5fdd27 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
@@ -563,6 +563,12 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
} catch (Exception e) {
log.warn("Could not properly remove job {} from submitted job graph store.", jobId);
}
+
+ try {
+ runningJobsRegistry.clearJob(jobId);
+ } catch (IOException e) {
+ log.warn("Could not properly remove job {} from the running jobs registry.", jobId);
+ }
}
},
getRpcService().getExecutor());
http://git-wip-us.apache.org/repos/asf/flink/blob/5ad0ca23/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/RunningJobsRegistry.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/RunningJobsRegistry.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/RunningJobsRegistry.java
index 43e5ac5..2e24f27 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/RunningJobsRegistry.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/RunningJobsRegistry.java
@@ -24,16 +24,16 @@ import java.io.IOException;
/**
* A simple registry that tracks if a certain job is pending execution, running, or completed.
- *
+ *
* <p>This registry is used in highly-available setups with multiple master nodes,
- * to determine whether a new leader should attempt to recover a certain job (because the
+ * to determine whether a new leader should attempt to recover a certain job (because the
* job is still running), or whether the job has already finished successfully (in case of a
* finite job) and the leader has only been granted leadership because the previous leader
* quit cleanly after the job was finished.
- *
+ *
* <p>In addition, the registry can help to determine whether a newly assigned leader JobManager
* should attempt reconciliation with running TaskManagers, or immediately schedule the job from
- * the latest checkpoint/savepoint.
+ * the latest checkpoint/savepoint.
*/
public interface RunningJobsRegistry {
@@ -42,13 +42,13 @@ public interface RunningJobsRegistry {
*/
enum JobSchedulingStatus {
- /** Job has not been scheduled, yet */
+ /** Job has not been scheduled, yet. */
PENDING,
- /** Job has been scheduled and is not yet finished */
+ /** Job has been scheduled and is not yet finished. */
RUNNING,
- /** Job has been finished, successfully or unsuccessfully */
+ /** Job has been finished, successfully or unsuccessfully. */
DONE;
}
@@ -57,7 +57,7 @@ public interface RunningJobsRegistry {
/**
* Marks a job as running. Requesting the job's status via the {@link #getJobSchedulingStatus(JobID)}
* method will return {@link JobSchedulingStatus#RUNNING}.
- *
+ *
* @param jobID The id of the job.
*
* @throws IOException Thrown when the communication with the highly-available storage or registry
@@ -70,7 +70,7 @@ public interface RunningJobsRegistry {
* method will return {@link JobSchedulingStatus#DONE}.
*
* @param jobID The id of the job.
- *
+ *
* @throws IOException Thrown when the communication with the highly-available storage or registry
* failed and could not be retried.
*/
@@ -81,17 +81,17 @@ public interface RunningJobsRegistry {
*
* @param jobID The id of the job to check.
* @return The job scheduling status.
- *
+ *
* @throws IOException Thrown when the communication with the highly-available storage or registry
* failed and could not be retried.
*/
JobSchedulingStatus getJobSchedulingStatus(JobID jobID) throws IOException;
/**
- * Clear job state form the registry, usually called after job finish
+ * Clear job state form the registry, usually called after job finish.
*
* @param jobID The id of the job to check.
- *
+ *
* @throws IOException Thrown when the communication with the highly-available storage or registry
* failed and could not be retried.
*/
http://git-wip-us.apache.org/repos/asf/flink/blob/5ad0ca23/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/standalone/StandaloneRunningJobsRegistry.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/standalone/StandaloneRunningJobsRegistry.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/standalone/StandaloneRunningJobsRegistry.java
index 585ef34..d289cc6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/standalone/StandaloneRunningJobsRegistry.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/standalone/StandaloneRunningJobsRegistry.java
@@ -30,7 +30,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
*/
public class StandaloneRunningJobsRegistry implements RunningJobsRegistry {
- /** The currently running jobs */
+ /** The currently running jobs. */
private final HashMap<JobID, JobSchedulingStatus> jobStatus = new HashMap<>();
@Override
@@ -54,7 +54,7 @@ public class StandaloneRunningJobsRegistry implements RunningJobsRegistry {
@Override
public JobSchedulingStatus getJobSchedulingStatus(JobID jobID) {
checkNotNull(jobID);
-
+
synchronized (jobStatus) {
JobSchedulingStatus status = jobStatus.get(jobID);
return status == null ? JobSchedulingStatus.PENDING : status;
http://git-wip-us.apache.org/repos/asf/flink/blob/5ad0ca23/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperRunningJobsRegistry.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperRunningJobsRegistry.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperRunningJobsRegistry.java
index 8a083d1..1a84b37 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperRunningJobsRegistry.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperRunningJobsRegistry.java
@@ -18,11 +18,12 @@
package org.apache.flink.runtime.highavailability.zookeeper;
-import org.apache.curator.framework.CuratorFramework;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.runtime.highavailability.RunningJobsRegistry;
+
+import org.apache.curator.framework.CuratorFramework;
import org.apache.zookeeper.data.Stat;
import java.io.IOException;
@@ -38,7 +39,7 @@ public class ZooKeeperRunningJobsRegistry implements RunningJobsRegistry {
private static final Charset ENCODING = Charset.forName("utf-8");
- /** The ZooKeeper client to use */
+ /** The ZooKeeper client to use. */
private final CuratorFramework client;
private final String runningJobPath;
@@ -88,7 +89,7 @@ public class ZooKeeperRunningJobsRegistry implements RunningJobsRegistry {
return JobSchedulingStatus.valueOf(name);
}
catch (IllegalArgumentException e) {
- throw new IOException("Found corrupt data in ZooKeeper: " +
+ throw new IOException("Found corrupt data in ZooKeeper: " +
Arrays.toString(data) + " is no valid job status");
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/5ad0ca23/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java
index 77b3fc4..f768251 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.BlobServerOptions;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.blob.BlobKey;
import org.apache.flink.runtime.blob.BlobServer;
import org.apache.flink.runtime.blob.BlobStore;
@@ -32,6 +33,7 @@ import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.highavailability.RunningJobsRegistry;
import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobStatus;
@@ -54,6 +56,7 @@ import org.apache.flink.runtime.rpc.TestingRpcService;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.runtime.testutils.InMemorySubmittedJobGraphStore;
import org.apache.flink.runtime.util.TestingFatalErrorHandler;
+import org.apache.flink.util.Preconditions;
import org.apache.flink.util.TestLogger;
import org.junit.After;
@@ -64,6 +67,7 @@ import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
+import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.io.File;
@@ -101,6 +105,12 @@ public class DispatcherResourceCleanupTest extends TestLogger {
private TestingLeaderElectionService dispatcherLeaderElectionService;
+ private SingleRunningJobsRegistry runningJobsRegistry;
+
+ private TestingHighAvailabilityServices highAvailabilityServices;
+
+ private OneShotLatch clearedJobLatch;
+
private TestingDispatcher dispatcher;
private DispatcherGateway dispatcherGateway;
@@ -134,9 +144,12 @@ public class DispatcherResourceCleanupTest extends TestLogger {
configuration = new Configuration();
configuration.setString(BlobServerOptions.STORAGE_DIRECTORY, temporaryFolder.newFolder().getAbsolutePath());
- final TestingHighAvailabilityServices highAvailabilityServices = new TestingHighAvailabilityServices();
+ highAvailabilityServices = new TestingHighAvailabilityServices();
dispatcherLeaderElectionService = new TestingLeaderElectionService();
highAvailabilityServices.setDispatcherLeaderElectionService(dispatcherLeaderElectionService);
+ clearedJobLatch = new OneShotLatch();
+ runningJobsRegistry = new SingleRunningJobsRegistry(jobId, clearedJobLatch);
+ highAvailabilityServices.setRunningJobsRegistry(runningJobsRegistry);
storedBlobFuture = new CompletableFuture<>();
deleteAllFuture = new CompletableFuture<>();
@@ -271,6 +284,79 @@ public class DispatcherResourceCleanupTest extends TestLogger {
assertThat(deleteAllFuture.isDone(), is(false));
}
+ /**
+ * Tests that the {@link RunningJobsRegistry} entries are cleared after the
+ * job reached a terminal state.
+ */
+ @Test
+ public void testRunningJobsRegistryCleanup() throws Exception {
+ submitJob();
+
+ runningJobsRegistry.setJobRunning(jobId);
+ assertThat(runningJobsRegistry.contains(jobId), is(true));
+
+ resultFuture.complete(new ArchivedExecutionGraphBuilder().setState(JobStatus.FINISHED).setJobID(jobId).build());
+
+ // wait for the clearing
+ clearedJobLatch.await();
+
+ assertThat(runningJobsRegistry.contains(jobId), is(false));
+ }
+
+ private static final class SingleRunningJobsRegistry implements RunningJobsRegistry {
+
+ @Nonnull
+ private final JobID expectedJobId;
+
+ @Nonnull
+ private final OneShotLatch clearedJobLatch;
+
+ private JobSchedulingStatus jobSchedulingStatus = JobSchedulingStatus.PENDING;
+
+ private boolean containsJob = false;
+
+ private SingleRunningJobsRegistry(@Nonnull JobID expectedJobId, @Nonnull OneShotLatch clearedJobLatch) {
+ this.expectedJobId = expectedJobId;
+ this.clearedJobLatch = clearedJobLatch;
+ }
+
+ @Override
+ public void setJobRunning(JobID jobID) {
+ checkJobId(jobID);
+ containsJob = true;
+ jobSchedulingStatus = JobSchedulingStatus.RUNNING;
+ }
+
+ private void checkJobId(JobID jobID) {
+ Preconditions.checkArgument(expectedJobId.equals(jobID));
+ }
+
+ @Override
+ public void setJobFinished(JobID jobID) {
+ checkJobId(jobID);
+ containsJob = true;
+ jobSchedulingStatus = JobSchedulingStatus.DONE;
+ }
+
+ @Override
+ public JobSchedulingStatus getJobSchedulingStatus(JobID jobID) {
+ checkJobId(jobID);
+ return jobSchedulingStatus;
+ }
+
+ public boolean contains(JobID jobId) {
+ checkJobId(jobId);
+ return containsJob;
+ }
+
+ @Override
+ public void clearJob(JobID jobID) {
+ checkJobId(jobID);
+ containsJob = false;
+ clearedJobLatch.trigger();
+ }
+ }
+
private static final class TestingDispatcher extends Dispatcher {
public TestingDispatcher(RpcService rpcService, String endpointId, Configuration configuration, HighAvailabilityServices highAvailabilityServices, SubmittedJobGraphStore submittedJobGraphStore, ResourceManagerGateway resourceManagerGateway, BlobServer blobServer, HeartbeatServices heartbeatServices, JobManagerMetricGroup jobManagerMetricGroup, @Nullable String metricServiceQueryPath, ArchivedExecutionGraphStore archivedExecutionGraphStore, JobManagerRunnerFactory jobManagerRunnerFactory, FatalErrorHandler fatalErrorHandler, @Nullable String restAddress, HistoryServerArchivist historyServerArchivist) throws Exception {
super(rpcService, endpointId, configuration, highAvailabilityServices, submittedJobGraphStore, resourceManagerGateway, blobServer, heartbeatServices, jobManagerMetricGroup, metricServiceQueryPath, archivedExecutionGraphStore, jobManagerRunnerFactory, fatalErrorHandler, restAddress, historyServerArchivist);
http://git-wip-us.apache.org/repos/asf/flink/blob/5ad0ca23/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
index 98d5d74..2489f16 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
@@ -56,7 +56,7 @@ public class TestingHighAvailabilityServices implements HighAvailabilityServices
private volatile SubmittedJobGraphStore submittedJobGraphStore;
- private final RunningJobsRegistry runningJobsRegistry = new StandaloneRunningJobsRegistry();
+ private volatile RunningJobsRegistry runningJobsRegistry = new StandaloneRunningJobsRegistry();
// ------------------------------------------------------------------------
// Setters for mock / testing implementations
@@ -102,6 +102,9 @@ public class TestingHighAvailabilityServices implements HighAvailabilityServices
this.submittedJobGraphStore = submittedJobGraphStore;
}
+ public void setRunningJobsRegistry(RunningJobsRegistry runningJobsRegistry) {
+ this.runningJobsRegistry = runningJobsRegistry;
+ }
// ------------------------------------------------------------------------
// HA Services Methods
// ------------------------------------------------------------------------