You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/05/24 14:49:46 UTC

[2/4] flink git commit: [FLINK-9421] Remove job from RunningJobsRegistry when it reaches a terminal state

[FLINK-9421] Remove job from RunningJobsRegistry when it reaches a terminal state

This commit lets the Dispatcher remove the RunningJobsRegistry entry for a completed job
when it is removed from the Dispatcher.

This closes #6068.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5ad0ca23
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5ad0ca23
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5ad0ca23

Branch: refs/heads/master
Commit: 5ad0ca2392a9672a92756337243834d1b466a24d
Parents: 47dc699
Author: Till Rohrmann <tr...@apache.org>
Authored: Wed May 23 23:48:38 2018 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Thu May 24 16:45:15 2018 +0200

----------------------------------------------------------------------
 .../flink/runtime/dispatcher/Dispatcher.java    |  6 ++
 .../highavailability/RunningJobsRegistry.java   | 24 +++---
 .../StandaloneRunningJobsRegistry.java          |  4 +-
 .../zookeeper/ZooKeeperRunningJobsRegistry.java |  7 +-
 .../DispatcherResourceCleanupTest.java          | 88 +++++++++++++++++++-
 .../TestingHighAvailabilityServices.java        |  5 +-
 6 files changed, 115 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/5ad0ca23/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
index 5022388..f5fdd27 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
@@ -563,6 +563,12 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
 					} catch (Exception e) {
 						log.warn("Could not properly remove job {} from submitted job graph store.", jobId);
 					}
+
+					try {
+						runningJobsRegistry.clearJob(jobId);
+					} catch (IOException e) {
+						log.warn("Could not properly remove job {} from the running jobs registry.", jobId);
+					}
 				}
 			},
 			getRpcService().getExecutor());

http://git-wip-us.apache.org/repos/asf/flink/blob/5ad0ca23/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/RunningJobsRegistry.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/RunningJobsRegistry.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/RunningJobsRegistry.java
index 43e5ac5..2e24f27 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/RunningJobsRegistry.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/RunningJobsRegistry.java
@@ -24,16 +24,16 @@ import java.io.IOException;
 
 /**
  * A simple registry that tracks if a certain job is pending execution, running, or completed.
- * 
+ *
  * <p>This registry is used in highly-available setups with multiple master nodes,
- * to determine whether a new leader should attempt to recover a certain job (because the 
+ * to determine whether a new leader should attempt to recover a certain job (because the
  * job is still running), or whether the job has already finished successfully (in case of a
  * finite job) and the leader has only been granted leadership because the previous leader
  * quit cleanly after the job was finished.
- * 
+ *
  * <p>In addition, the registry can help to determine whether a newly assigned leader JobManager
  * should attempt reconciliation with running TaskManagers, or immediately schedule the job from
- * the latest checkpoint/savepoint. 
+ * the latest checkpoint/savepoint.
  */
 public interface RunningJobsRegistry {
 
@@ -42,13 +42,13 @@ public interface RunningJobsRegistry {
 	 */
 	enum JobSchedulingStatus {
 
-		/** Job has not been scheduled, yet */
+		/** Job has not been scheduled, yet. */
 		PENDING,
 
-		/** Job has been scheduled and is not yet finished */
+		/** Job has been scheduled and is not yet finished. */
 		RUNNING,
 
-		/** Job has been finished, successfully or unsuccessfully */
+		/** Job has been finished, successfully or unsuccessfully. */
 		DONE;
 	}
 
@@ -57,7 +57,7 @@ public interface RunningJobsRegistry {
 	/**
 	 * Marks a job as running. Requesting the job's status via the {@link #getJobSchedulingStatus(JobID)}
 	 * method will return {@link JobSchedulingStatus#RUNNING}.
-	 * 
+	 *
 	 * @param jobID The id of the job.
 	 *
 	 * @throws IOException Thrown when the communication with the highly-available storage or registry
@@ -70,7 +70,7 @@ public interface RunningJobsRegistry {
 	 * method will return {@link JobSchedulingStatus#DONE}.
 	 *
 	 * @param jobID The id of the job.
-	 * 
+	 *
 	 * @throws IOException Thrown when the communication with the highly-available storage or registry
 	 *                     failed and could not be retried.
 	 */
@@ -81,17 +81,17 @@ public interface RunningJobsRegistry {
 	 *
 	 * @param jobID The id of the job to check.
 	 * @return The job scheduling status.
-	 * 
+	 *
 	 * @throws IOException Thrown when the communication with the highly-available storage or registry
 	 *                     failed and could not be retried.
 	 */
 	JobSchedulingStatus getJobSchedulingStatus(JobID jobID) throws IOException;
 
 	/**
-	 * Clear job state form the registry, usually called after job finish
+	 * Clear job state form the registry, usually called after job finish.
 	 *
 	 * @param jobID The id of the job to check.
-	 * 
+	 *
 	 * @throws IOException Thrown when the communication with the highly-available storage or registry
 	 *                     failed and could not be retried.
 	 */

http://git-wip-us.apache.org/repos/asf/flink/blob/5ad0ca23/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/standalone/StandaloneRunningJobsRegistry.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/standalone/StandaloneRunningJobsRegistry.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/standalone/StandaloneRunningJobsRegistry.java
index 585ef34..d289cc6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/standalone/StandaloneRunningJobsRegistry.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/standalone/StandaloneRunningJobsRegistry.java
@@ -30,7 +30,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  */
 public class StandaloneRunningJobsRegistry implements RunningJobsRegistry {
 
-	/** The currently running jobs */
+	/** The currently running jobs. */
 	private final HashMap<JobID, JobSchedulingStatus> jobStatus = new HashMap<>();
 
 	@Override
@@ -54,7 +54,7 @@ public class StandaloneRunningJobsRegistry implements RunningJobsRegistry {
 	@Override
 	public JobSchedulingStatus getJobSchedulingStatus(JobID jobID) {
 		checkNotNull(jobID);
-		
+
 		synchronized (jobStatus) {
 			JobSchedulingStatus status = jobStatus.get(jobID);
 			return status == null ? JobSchedulingStatus.PENDING : status;

http://git-wip-us.apache.org/repos/asf/flink/blob/5ad0ca23/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperRunningJobsRegistry.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperRunningJobsRegistry.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperRunningJobsRegistry.java
index 8a083d1..1a84b37 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperRunningJobsRegistry.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperRunningJobsRegistry.java
@@ -18,11 +18,12 @@
 
 package org.apache.flink.runtime.highavailability.zookeeper;
 
-import org.apache.curator.framework.CuratorFramework;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.runtime.highavailability.RunningJobsRegistry;
+
+import org.apache.curator.framework.CuratorFramework;
 import org.apache.zookeeper.data.Stat;
 
 import java.io.IOException;
@@ -38,7 +39,7 @@ public class ZooKeeperRunningJobsRegistry implements RunningJobsRegistry {
 
 	private static final Charset ENCODING = Charset.forName("utf-8");
 
-	/** The ZooKeeper client to use */
+	/** The ZooKeeper client to use. */
 	private final CuratorFramework client;
 
 	private final String runningJobPath;
@@ -88,7 +89,7 @@ public class ZooKeeperRunningJobsRegistry implements RunningJobsRegistry {
 						return JobSchedulingStatus.valueOf(name);
 					}
 					catch (IllegalArgumentException e) {
-						throw new IOException("Found corrupt data in ZooKeeper: " + 
+						throw new IOException("Found corrupt data in ZooKeeper: " +
 								Arrays.toString(data) + " is no valid job status");
 					}
 				}

http://git-wip-us.apache.org/repos/asf/flink/blob/5ad0ca23/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java
index 77b3fc4..f768251 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.BlobServerOptions;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.blob.BlobServer;
 import org.apache.flink.runtime.blob.BlobStore;
@@ -32,6 +33,7 @@ import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.highavailability.RunningJobsRegistry;
 import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobStatus;
@@ -54,6 +56,7 @@ import org.apache.flink.runtime.rpc.TestingRpcService;
 import org.apache.flink.runtime.testtasks.NoOpInvokable;
 import org.apache.flink.runtime.testutils.InMemorySubmittedJobGraphStore;
 import org.apache.flink.runtime.util.TestingFatalErrorHandler;
+import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.After;
@@ -64,6 +67,7 @@ import org.junit.ClassRule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 
+import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
 import java.io.File;
@@ -101,6 +105,12 @@ public class DispatcherResourceCleanupTest extends TestLogger {
 
 	private TestingLeaderElectionService dispatcherLeaderElectionService;
 
+	private SingleRunningJobsRegistry runningJobsRegistry;
+
+	private TestingHighAvailabilityServices highAvailabilityServices;
+
+	private OneShotLatch clearedJobLatch;
+
 	private TestingDispatcher dispatcher;
 
 	private DispatcherGateway dispatcherGateway;
@@ -134,9 +144,12 @@ public class DispatcherResourceCleanupTest extends TestLogger {
 		configuration = new Configuration();
 		configuration.setString(BlobServerOptions.STORAGE_DIRECTORY, temporaryFolder.newFolder().getAbsolutePath());
 
-		final TestingHighAvailabilityServices highAvailabilityServices = new TestingHighAvailabilityServices();
+		highAvailabilityServices = new TestingHighAvailabilityServices();
 		dispatcherLeaderElectionService = new TestingLeaderElectionService();
 		highAvailabilityServices.setDispatcherLeaderElectionService(dispatcherLeaderElectionService);
+		clearedJobLatch = new OneShotLatch();
+		runningJobsRegistry = new SingleRunningJobsRegistry(jobId, clearedJobLatch);
+		highAvailabilityServices.setRunningJobsRegistry(runningJobsRegistry);
 
 		storedBlobFuture = new CompletableFuture<>();
 		deleteAllFuture = new CompletableFuture<>();
@@ -271,6 +284,79 @@ public class DispatcherResourceCleanupTest extends TestLogger {
 		assertThat(deleteAllFuture.isDone(), is(false));
 	}
 
+	/**
+	 * Tests that the {@link RunningJobsRegistry} entries are cleared after the
+	 * job reached a terminal state.
+	 */
+	@Test
+	public void testRunningJobsRegistryCleanup() throws Exception {
+		submitJob();
+
+		runningJobsRegistry.setJobRunning(jobId);
+		assertThat(runningJobsRegistry.contains(jobId), is(true));
+
+		resultFuture.complete(new ArchivedExecutionGraphBuilder().setState(JobStatus.FINISHED).setJobID(jobId).build());
+
+		// wait for the clearing
+		clearedJobLatch.await();
+
+		assertThat(runningJobsRegistry.contains(jobId), is(false));
+	}
+
+	private static final class SingleRunningJobsRegistry implements RunningJobsRegistry {
+
+		@Nonnull
+		private final JobID expectedJobId;
+
+		@Nonnull
+		private final OneShotLatch clearedJobLatch;
+
+		private JobSchedulingStatus jobSchedulingStatus = JobSchedulingStatus.PENDING;
+
+		private boolean containsJob = false;
+
+		private SingleRunningJobsRegistry(@Nonnull JobID expectedJobId, @Nonnull OneShotLatch clearedJobLatch) {
+			this.expectedJobId = expectedJobId;
+			this.clearedJobLatch = clearedJobLatch;
+		}
+
+		@Override
+		public void setJobRunning(JobID jobID) {
+			checkJobId(jobID);
+			containsJob = true;
+			jobSchedulingStatus = JobSchedulingStatus.RUNNING;
+		}
+
+		private void checkJobId(JobID jobID) {
+			Preconditions.checkArgument(expectedJobId.equals(jobID));
+		}
+
+		@Override
+		public void setJobFinished(JobID jobID) {
+			checkJobId(jobID);
+			containsJob = true;
+			jobSchedulingStatus = JobSchedulingStatus.DONE;
+		}
+
+		@Override
+		public JobSchedulingStatus getJobSchedulingStatus(JobID jobID) {
+			checkJobId(jobID);
+			return jobSchedulingStatus;
+		}
+
+		public boolean contains(JobID jobId) {
+			checkJobId(jobId);
+			return containsJob;
+		}
+
+		@Override
+		public void clearJob(JobID jobID) {
+			checkJobId(jobID);
+			containsJob = false;
+			clearedJobLatch.trigger();
+		}
+	}
+
 	private static final class TestingDispatcher extends Dispatcher {
 		public TestingDispatcher(RpcService rpcService, String endpointId, Configuration configuration, HighAvailabilityServices highAvailabilityServices, SubmittedJobGraphStore submittedJobGraphStore, ResourceManagerGateway resourceManagerGateway, BlobServer blobServer, HeartbeatServices heartbeatServices, JobManagerMetricGroup jobManagerMetricGroup, @Nullable String metricServiceQueryPath, ArchivedExecutionGraphStore archivedExecutionGraphStore, JobManagerRunnerFactory jobManagerRunnerFactory, FatalErrorHandler fatalErrorHandler, @Nullable String restAddress, HistoryServerArchivist historyServerArchivist) throws Exception {
 			super(rpcService, endpointId, configuration, highAvailabilityServices, submittedJobGraphStore, resourceManagerGateway, blobServer, heartbeatServices, jobManagerMetricGroup, metricServiceQueryPath, archivedExecutionGraphStore, jobManagerRunnerFactory, fatalErrorHandler, restAddress, historyServerArchivist);

http://git-wip-us.apache.org/repos/asf/flink/blob/5ad0ca23/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
index 98d5d74..2489f16 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
@@ -56,7 +56,7 @@ public class TestingHighAvailabilityServices implements HighAvailabilityServices
 
 	private volatile SubmittedJobGraphStore submittedJobGraphStore;
 
-	private final RunningJobsRegistry runningJobsRegistry = new StandaloneRunningJobsRegistry();
+	private volatile RunningJobsRegistry runningJobsRegistry = new StandaloneRunningJobsRegistry();
 
 	// ------------------------------------------------------------------------
 	//  Setters for mock / testing implementations
@@ -102,6 +102,9 @@ public class TestingHighAvailabilityServices implements HighAvailabilityServices
 		this.submittedJobGraphStore = submittedJobGraphStore;
 	}
 
+	public void setRunningJobsRegistry(RunningJobsRegistry runningJobsRegistry) {
+		this.runningJobsRegistry = runningJobsRegistry;
+	}
 	// ------------------------------------------------------------------------
 	//  HA Services Methods
 	// ------------------------------------------------------------------------