You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/09/12 14:43:30 UTC

[flink] 07/08: [hotfix][tests] Ensure that JobManagerRunners are stopped when Dispatcher loses leadership

This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 05b11f9e74ca4c7e562d92eeff69fab8b88c6ee8
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Fri Aug 17 15:24:45 2018 +0200

    [hotfix][tests] Ensure that JobManagerRunners are stopped when Dispatcher loses leadership
---
 .../flink/runtime/dispatcher/DispatcherHATest.java | 75 ++++++++++++++++++----
 1 file changed, 64 insertions(+), 11 deletions(-)

diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherHATest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherHATest.java
index adf7618..cb26f48 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherHATest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherHATest.java
@@ -30,9 +30,11 @@ import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobmanager.StandaloneSubmittedJobGraphStore;
 import org.apache.flink.runtime.jobmanager.SubmittedJobGraph;
 import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore;
 import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
+import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
 import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
@@ -111,8 +113,6 @@ public class DispatcherHATest extends TestLogger {
 	 */
 	@Test
 	public void testGrantingRevokingLeadership() throws Exception {
-
-		final Configuration configuration = new Configuration();
 		final TestingHighAvailabilityServices highAvailabilityServices = new TestingHighAvailabilityServices();
 		final JobGraph nonEmptyJobGraph = createNonEmptyJobGraph();
 		final SubmittedJobGraph submittedJobGraph = new SubmittedJobGraph(nonEmptyJobGraph, null);
@@ -125,7 +125,34 @@ public class DispatcherHATest extends TestLogger {
 
 		final BlockingQueue<DispatcherId> fencingTokens = new ArrayBlockingQueue<>(2);
 
-		final HATestingDispatcher dispatcher = new HATestingDispatcher(
+		final HATestingDispatcher dispatcher = createHADispatcher(highAvailabilityServices, fencingTokens);
+
+		dispatcher.start();
+
+		try {
+			final UUID leaderId = UUID.randomUUID();
+			dispatcherLeaderElectionService.isLeader(leaderId);
+
+			dispatcherLeaderElectionService.notLeader();
+
+			final DispatcherId firstFencingToken = fencingTokens.take();
+
+			assertThat(firstFencingToken, equalTo(NULL_FENCING_TOKEN));
+
+			enterGetJobIdsLatch.await();
+			proceedGetJobIdsLatch.trigger();
+
+			assertThat(dispatcher.getNumberJobs(timeout).get(), is(0));
+
+		} finally {
+			RpcUtils.terminateRpcEndpoint(dispatcher, timeout);
+		}
+	}
+
+	@Nonnull
+	private HATestingDispatcher createHADispatcher(TestingHighAvailabilityServices highAvailabilityServices, BlockingQueue<DispatcherId> fencingTokens) throws Exception {
+		final Configuration configuration = new Configuration();
+		return new HATestingDispatcher(
 			rpcService,
 			UUID.randomUUID().toString(),
 			configuration,
@@ -139,24 +166,50 @@ public class DispatcherHATest extends TestLogger {
 			new TestingJobManagerRunnerFactory(new CompletableFuture<>(), new CompletableFuture<>()),
 			testingFatalErrorHandler,
 			fencingTokens);
+	}
+
+	/**
+	 * Tests that all JobManagerRunner are terminated if the leadership of the
+	 * Dispatcher is revoked.
+	 */
+	@Test
+	public void testRevokeLeadershipTerminatesJobManagerRunners() throws Exception {
+
+		final TestingHighAvailabilityServices highAvailabilityServices = new TestingHighAvailabilityServices();
+		highAvailabilityServices.setSubmittedJobGraphStore(new StandaloneSubmittedJobGraphStore());
+
+		final TestingLeaderElectionService leaderElectionService = new TestingLeaderElectionService();
+		highAvailabilityServices.setDispatcherLeaderElectionService(leaderElectionService);
+
+		final ArrayBlockingQueue<DispatcherId> fencingTokens = new ArrayBlockingQueue<>(2);
+		final HATestingDispatcher dispatcher = createHADispatcher(
+			highAvailabilityServices,
+			fencingTokens);
 
 		dispatcher.start();
 
 		try {
-			final UUID leaderId = UUID.randomUUID();
-			dispatcherLeaderElectionService.isLeader(leaderId);
+			// grant leadership and submit a single job
+			final DispatcherId expectedDispatcherId = DispatcherId.generate();
 
-			dispatcherLeaderElectionService.notLeader();
+			leaderElectionService.isLeader(expectedDispatcherId.toUUID()).get();
 
-			final DispatcherId firstFencingToken = fencingTokens.take();
+			assertThat(fencingTokens.take(), is(equalTo(expectedDispatcherId)));
 
-			assertThat(firstFencingToken, equalTo(NULL_FENCING_TOKEN));
+			final DispatcherGateway dispatcherGateway = dispatcher.getSelfGateway(DispatcherGateway.class);
 
-			enterGetJobIdsLatch.await();
-			proceedGetJobIdsLatch.trigger();
+			final CompletableFuture<Acknowledge> submissionFuture = dispatcherGateway.submitJob(createNonEmptyJobGraph(), timeout);
 
-			assertThat(dispatcher.getNumberJobs(timeout).get(), is(0));
+			submissionFuture.get();
+
+			assertThat(dispatcher.getNumberJobs(timeout).get(), is(1));
 
+			// revoke the leadership --> this should stop all running JobManagerRunners
+			leaderElectionService.notLeader();
+
+			assertThat(fencingTokens.take(), is(equalTo(NULL_FENCING_TOKEN)));
+
+			assertThat(dispatcher.getNumberJobs(timeout).get(), is(0));
 		} finally {
 			RpcUtils.terminateRpcEndpoint(dispatcher, timeout);
 		}