You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/09/12 14:43:30 UTC
[flink] 07/08: [hotfix][tests] Ensure that JobManagerRunners are
stopped when Dispatcher loses leadership
This is an automated email from the ASF dual-hosted git repository.
trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 05b11f9e74ca4c7e562d92eeff69fab8b88c6ee8
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Fri Aug 17 15:24:45 2018 +0200
[hotfix][tests] Ensure that JobManagerRunners are stopped when Dispatcher loses leadership
---
.../flink/runtime/dispatcher/DispatcherHATest.java | 75 ++++++++++++++++++----
1 file changed, 64 insertions(+), 11 deletions(-)
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherHATest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherHATest.java
index adf7618..cb26f48 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherHATest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherHATest.java
@@ -30,9 +30,11 @@ import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobmanager.StandaloneSubmittedJobGraphStore;
import org.apache.flink.runtime.jobmanager.SubmittedJobGraph;
import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore;
import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
+import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
@@ -111,8 +113,6 @@ public class DispatcherHATest extends TestLogger {
*/
@Test
public void testGrantingRevokingLeadership() throws Exception {
-
- final Configuration configuration = new Configuration();
final TestingHighAvailabilityServices highAvailabilityServices = new TestingHighAvailabilityServices();
final JobGraph nonEmptyJobGraph = createNonEmptyJobGraph();
final SubmittedJobGraph submittedJobGraph = new SubmittedJobGraph(nonEmptyJobGraph, null);
@@ -125,7 +125,34 @@ public class DispatcherHATest extends TestLogger {
final BlockingQueue<DispatcherId> fencingTokens = new ArrayBlockingQueue<>(2);
- final HATestingDispatcher dispatcher = new HATestingDispatcher(
+ final HATestingDispatcher dispatcher = createHADispatcher(highAvailabilityServices, fencingTokens);
+
+ dispatcher.start();
+
+ try {
+ final UUID leaderId = UUID.randomUUID();
+ dispatcherLeaderElectionService.isLeader(leaderId);
+
+ dispatcherLeaderElectionService.notLeader();
+
+ final DispatcherId firstFencingToken = fencingTokens.take();
+
+ assertThat(firstFencingToken, equalTo(NULL_FENCING_TOKEN));
+
+ enterGetJobIdsLatch.await();
+ proceedGetJobIdsLatch.trigger();
+
+ assertThat(dispatcher.getNumberJobs(timeout).get(), is(0));
+
+ } finally {
+ RpcUtils.terminateRpcEndpoint(dispatcher, timeout);
+ }
+ }
+
+ @Nonnull
+ private HATestingDispatcher createHADispatcher(TestingHighAvailabilityServices highAvailabilityServices, BlockingQueue<DispatcherId> fencingTokens) throws Exception {
+ final Configuration configuration = new Configuration();
+ return new HATestingDispatcher(
rpcService,
UUID.randomUUID().toString(),
configuration,
@@ -139,24 +166,50 @@ public class DispatcherHATest extends TestLogger {
new TestingJobManagerRunnerFactory(new CompletableFuture<>(), new CompletableFuture<>()),
testingFatalErrorHandler,
fencingTokens);
+ }
+
+ /**
+ * Tests that all JobManagerRunner are terminated if the leadership of the
+ * Dispatcher is revoked.
+ */
+ @Test
+ public void testRevokeLeadershipTerminatesJobManagerRunners() throws Exception {
+
+ final TestingHighAvailabilityServices highAvailabilityServices = new TestingHighAvailabilityServices();
+ highAvailabilityServices.setSubmittedJobGraphStore(new StandaloneSubmittedJobGraphStore());
+
+ final TestingLeaderElectionService leaderElectionService = new TestingLeaderElectionService();
+ highAvailabilityServices.setDispatcherLeaderElectionService(leaderElectionService);
+
+ final ArrayBlockingQueue<DispatcherId> fencingTokens = new ArrayBlockingQueue<>(2);
+ final HATestingDispatcher dispatcher = createHADispatcher(
+ highAvailabilityServices,
+ fencingTokens);
dispatcher.start();
try {
- final UUID leaderId = UUID.randomUUID();
- dispatcherLeaderElectionService.isLeader(leaderId);
+ // grant leadership and submit a single job
+ final DispatcherId expectedDispatcherId = DispatcherId.generate();
- dispatcherLeaderElectionService.notLeader();
+ leaderElectionService.isLeader(expectedDispatcherId.toUUID()).get();
- final DispatcherId firstFencingToken = fencingTokens.take();
+ assertThat(fencingTokens.take(), is(equalTo(expectedDispatcherId)));
- assertThat(firstFencingToken, equalTo(NULL_FENCING_TOKEN));
+ final DispatcherGateway dispatcherGateway = dispatcher.getSelfGateway(DispatcherGateway.class);
- enterGetJobIdsLatch.await();
- proceedGetJobIdsLatch.trigger();
+ final CompletableFuture<Acknowledge> submissionFuture = dispatcherGateway.submitJob(createNonEmptyJobGraph(), timeout);
- assertThat(dispatcher.getNumberJobs(timeout).get(), is(0));
+ submissionFuture.get();
+
+ assertThat(dispatcher.getNumberJobs(timeout).get(), is(1));
+ // revoke the leadership --> this should stop all running JobManagerRunners
+ leaderElectionService.notLeader();
+
+ assertThat(fencingTokens.take(), is(equalTo(NULL_FENCING_TOKEN)));
+
+ assertThat(dispatcher.getNumberJobs(timeout).get(), is(0));
} finally {
RpcUtils.terminateRpcEndpoint(dispatcher, timeout);
}