You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/09/03 21:01:37 UTC
flink git commit: [FLINK-7457] Make Dispatcher highly available
Repository: flink
Updated Branches:
refs/heads/master a00830318 -> fb3bd1fce
[FLINK-7457] Make Dispatcher highly available
This commit introduces a dispatcher leader election and retrieval service to the
HighAvailabilityServices. Moreover it adds code such that the Dispatcher now takes
part in the leader election process using the afore-mentioned services.
Let Dispatcher participate in leader election
Add test for Dispatcher leader election
This closes #4548.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fb3bd1fc
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fb3bd1fc
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fb3bd1fc
Branch: refs/heads/master
Commit: fb3bd1fceba6da362966491e55c3bf27566ede53
Parents: a008303
Author: Till Rohrmann <tr...@apache.org>
Authored: Wed Aug 16 14:36:13 2017 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Sun Sep 3 22:59:25 2017 +0200
----------------------------------------------------------------------
.../flink/runtime/dispatcher/Dispatcher.java | 152 ++++++++++++++++++-
.../runtime/dispatcher/DispatcherException.java | 40 +++++
.../runtime/dispatcher/DispatcherGateway.java | 3 +
.../HighAvailabilityServices.java | 13 ++
.../HighAvailabilityServicesUtils.java | 12 +-
.../nonha/embedded/EmbeddedHaServices.java | 13 ++
.../nonha/standalone/StandaloneHaServices.java | 27 +++-
.../zookeeper/ZooKeeperHaServices.java | 12 ++
.../org/apache/flink/runtime/rpc/RpcUtils.java | 22 +++
.../runtime/dispatcher/DispatcherTest.java | 114 +++++++++++---
.../TestingHighAvailabilityServices.java | 33 ++++
.../TestingManualHighAvailabilityServices.java | 13 ++
.../standalone/StandaloneHaServicesTest.java | 6 +-
.../runtime/taskexecutor/TaskExecutorTest.java | 2 +
.../YarnIntraNonHaMasterServices.java | 25 +++
.../YarnPreConfiguredMasterNonHaServices.java | 32 ++++
16 files changed, 492 insertions(+), 27 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/fb3bd1fc/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
index bb0b3e4..29262cd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
@@ -28,17 +28,21 @@ import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.highavailability.LeaderIdMismatchException;
import org.apache.flink.runtime.highavailability.RunningJobsRegistry;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobmanager.OnCompletionActions;
import org.apache.flink.runtime.jobmanager.SubmittedJobGraph;
import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore;
import org.apache.flink.runtime.jobmaster.JobManagerRunner;
+import org.apache.flink.runtime.leaderelection.LeaderContender;
+import org.apache.flink.runtime.leaderelection.LeaderElectionService;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcEndpoint;
import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.RpcUtils;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
@@ -47,6 +51,7 @@ import java.io.IOException;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
+import java.util.UUID;
import java.util.concurrent.CompletableFuture;
/**
@@ -55,7 +60,7 @@ import java.util.concurrent.CompletableFuture;
* the jobs and to recover them in case of a master failure. Furthermore, it knows
* about the state of the Flink session cluster.
*/
-public abstract class Dispatcher extends RpcEndpoint implements DispatcherGateway {
+public abstract class Dispatcher extends RpcEndpoint implements DispatcherGateway, LeaderContender {
public static final String DISPATCHER_NAME = "dispatcher";
@@ -73,6 +78,10 @@ public abstract class Dispatcher extends RpcEndpoint implements DispatcherGatewa
private final Map<JobID, JobManagerRunner> jobManagerRunners;
+ private final LeaderElectionService leaderElectionService;
+
+ private volatile UUID leaderSessionId;
+
protected Dispatcher(
RpcService rpcService,
String endpointId,
@@ -95,6 +104,11 @@ public abstract class Dispatcher extends RpcEndpoint implements DispatcherGatewa
this.runningJobsRegistry = highAvailabilityServices.getRunningJobsRegistry();
jobManagerRunners = new HashMap<>(16);
+
+ leaderElectionService = highAvailabilityServices.getDispatcherLeaderElectionService();
+
+ // we are not the leader when this object is created
+ leaderSessionId = null;
}
//------------------------------------------------------
@@ -104,12 +118,8 @@ public abstract class Dispatcher extends RpcEndpoint implements DispatcherGatewa
@Override
public void postStop() throws Exception {
Exception exception = null;
- // stop all currently running JobManagerRunners
- for (JobManagerRunner jobManagerRunner : jobManagerRunners.values()) {
- jobManagerRunner.shutdown();
- }
- jobManagerRunners.clear();
+ clearState();
try {
submittedJobGraphStore.stop();
@@ -118,6 +128,12 @@ public abstract class Dispatcher extends RpcEndpoint implements DispatcherGatewa
}
try {
+ leaderElectionService.stop();
+ } catch (Exception e) {
+ exception = ExceptionUtils.firstOrSuppressed(e, exception);
+ }
+
+ try {
super.postStop();
} catch (Exception e) {
exception = ExceptionUtils.firstOrSuppressed(e, exception);
@@ -128,12 +144,26 @@ public abstract class Dispatcher extends RpcEndpoint implements DispatcherGatewa
}
}
+ @Override
+ public void start() throws Exception {
+ super.start();
+
+ leaderElectionService.start(this);
+ }
+
//------------------------------------------------------
// RPCs
//------------------------------------------------------
@Override
- public CompletableFuture<Acknowledge> submitJob(JobGraph jobGraph, Time timeout) {
+ public CompletableFuture<Acknowledge> submitJob(JobGraph jobGraph, UUID leaderSessionId, Time timeout) {
+
+ try {
+ validateLeaderSessionId(leaderSessionId);
+ } catch (LeaderIdMismatchException e) {
+ return FutureUtils.completedExceptionally(e);
+ }
+
final JobID jobId = jobGraph.getJobID();
log.info("Submitting job {} ({}).", jobGraph.getJobID(), jobGraph.getName());
@@ -224,6 +254,62 @@ public abstract class Dispatcher extends RpcEndpoint implements DispatcherGatewa
// TODO: remove job related files from blob server
}
+ /**
+ * Clears the state of the dispatcher.
+ *
+ * <p>The state are all currently running jobs.
+ */
+ private void clearState() {
+ // stop all currently running JobManager since they run in the same process
+ for (JobManagerRunner jobManagerRunner : jobManagerRunners.values()) {
+ jobManagerRunner.shutdown();
+ }
+
+ jobManagerRunners.clear();
+ }
+
+ /**
+ * Recovers all jobs persisted via the submitted job graph store.
+ */
+ private void recoverJobs() {
+ log.info("Recovering all persisted jobs.");
+
+ final UUID currentLeaderSessionId = leaderSessionId;
+
+ getRpcService().execute(
+ () -> {
+ final Collection<JobID> jobIds;
+
+ try {
+ jobIds = submittedJobGraphStore.getJobIds();
+ } catch (Exception e) {
+ log.error("Could not recover job ids from the submitted job graph store. Aborting recovery.", e);
+ return;
+ }
+
+ for (JobID jobId : jobIds) {
+ try {
+ SubmittedJobGraph submittedJobGraph = submittedJobGraphStore.recoverJobGraph(jobId);
+
+ runAsync(() -> submitJob(submittedJobGraph.getJobGraph(), currentLeaderSessionId, RpcUtils.INF_TIMEOUT));
+ } catch (Exception e) {
+ log.error("Could not recover the job graph for " + jobId + '.', e);
+ }
+ }
+ });
+ }
+
+ private void onFatalError(Throwable throwable) {
+ log.error("Fatal error occurred in dispatcher {}.", getAddress(), throwable);
+ fatalErrorHandler.onFatalError(throwable);
+ }
+
+ private void validateLeaderSessionId(UUID leaderSessionID) throws LeaderIdMismatchException {
+ if (this.leaderSessionId == null || !this.leaderSessionId.equals(leaderSessionID)) {
+ throw new LeaderIdMismatchException(this.leaderSessionId, leaderSessionID);
+ }
+ }
+
protected abstract JobManagerRunner createJobManagerRunner(
ResourceID resourceId,
JobGraph jobGraph,
@@ -237,6 +323,58 @@ public abstract class Dispatcher extends RpcEndpoint implements DispatcherGatewa
FatalErrorHandler fatalErrorHandler) throws Exception;
//------------------------------------------------------
+ // Leader contender
+ //------------------------------------------------------
+
+ /**
+ * Callback method when current resourceManager is granted leadership.
+ *
+ * @param newLeaderSessionID unique leadershipID
+ */
+ @Override
+ public void grantLeadership(final UUID newLeaderSessionID) {
+ runAsync(
+ () -> {
+ log.info("Dispatcher {} was granted leadership with leader session ID {}", getAddress(), newLeaderSessionID);
+
+ // clear the state if we've been the leader before
+ if (leaderSessionId != null) {
+ clearState();
+ }
+
+ leaderSessionId = newLeaderSessionID;
+
+ // confirming the leader session ID might be blocking,
+ getRpcService().execute(
+ () -> leaderElectionService.confirmLeaderSessionID(newLeaderSessionID));
+
+ recoverJobs();
+ });
+ }
+
+ /**
+ * Callback method when current resourceManager loses leadership.
+ */
+ @Override
+ public void revokeLeadership() {
+ runAsync(
+ () -> {
+ log.info("Dispatcher {} was revoked leadership.", getAddress());
+ clearState();
+ });
+ }
+
+ /**
+ * Handles error occurring in the leader election service.
+ *
+ * @param exception Exception being thrown in the leader election service
+ */
+ @Override
+ public void handleError(final Exception exception) {
+ onFatalError(new DispatcherException("Received an error from the LeaderElectionService.", exception));
+ }
+
+ //------------------------------------------------------
// Utility classes
//------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/fb3bd1fc/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherException.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherException.java
new file mode 100644
index 0000000..cf4a493
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherException.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.util.FlinkException;
+
+/**
+ * Base class for {@link Dispatcher} related exceptions.
+ */
+public class DispatcherException extends FlinkException {
+ private static final long serialVersionUID = 3781733042984381286L;
+
+ public DispatcherException(String message) {
+ super(message);
+ }
+
+ public DispatcherException(Throwable cause) {
+ super(cause);
+ }
+
+ public DispatcherException(String message, Throwable cause) {
+ super(message, cause);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/fb3bd1fc/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherGateway.java
index 33b8a42..669f616 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherGateway.java
@@ -26,6 +26,7 @@ import org.apache.flink.runtime.rpc.RpcGateway;
import org.apache.flink.runtime.rpc.RpcTimeout;
import java.util.Collection;
+import java.util.UUID;
import java.util.concurrent.CompletableFuture;
/**
@@ -37,11 +38,13 @@ public interface DispatcherGateway extends RpcGateway {
* Submit a job to the dispatcher.
*
* @param jobGraph JobGraph to submit
+ * @param leaderSessionId leader session id
* @param timeout RPC timeout
* @return A future acknowledge if the submission succeeded
*/
CompletableFuture<Acknowledge> submitJob(
JobGraph jobGraph,
+ UUID leaderSessionId,
@RpcTimeout Time timeout);
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/fb3bd1fc/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java
index b44905e..defe5cc 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java
@@ -73,6 +73,12 @@ public interface HighAvailabilityServices extends AutoCloseable {
LeaderRetrievalService getResourceManagerLeaderRetriever();
/**
+ * Gets the leader retriever for the dispatcher. This leader retrieval service
+ * is not always accessible.
+ */
+ LeaderRetrievalService getDispatcherLeaderRetriever();
+
+ /**
* Gets the leader retriever for the job JobMaster which is responsible for the given job
*
* @param jobID The identifier of the job.
@@ -100,6 +106,13 @@ public interface HighAvailabilityServices extends AutoCloseable {
LeaderElectionService getResourceManagerLeaderElectionService();
/**
+ * Gets the leader election service for the cluster's dispatcher.
+ *
+ * @return Leader election service for the dispatcher leader election
+ */
+ LeaderElectionService getDispatcherLeaderElectionService();
+
+ /**
* Gets the leader election service for the given job.
*
* @param jobID The identifier of the job running the election.
http://git-wip-us.apache.org/repos/asf/flink/blob/fb3bd1fc/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java
index 2ebfd20..7a89ed8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java
@@ -23,6 +23,7 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.runtime.blob.BlobStoreService;
import org.apache.flink.runtime.blob.BlobUtils;
+import org.apache.flink.runtime.dispatcher.Dispatcher;
import org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedHaServices;
import org.apache.flink.runtime.highavailability.nonha.standalone.StandaloneHaServices;
import org.apache.flink.runtime.highavailability.zookeeper.ZooKeeperHaServices;
@@ -87,8 +88,17 @@ public class HighAvailabilityServicesUtils {
ResourceManager.RESOURCE_MANAGER_NAME,
addressResolution,
configuration);
+ final String dispatcherRpcUrl = AkkaRpcServiceUtils.getRpcUrl(
+ hostnamePort.f0,
+ hostnamePort.f1,
+ Dispatcher.DISPATCHER_NAME,
+ addressResolution,
+ configuration);
- return new StandaloneHaServices(resourceManagerRpcUrl, jobManagerRpcUrl);
+ return new StandaloneHaServices(
+ resourceManagerRpcUrl,
+ dispatcherRpcUrl,
+ jobManagerRpcUrl);
case ZOOKEEPER:
BlobStoreService blobStoreService = BlobUtils.createBlobStoreFromConfig(configuration);
http://git-wip-us.apache.org/repos/asf/flink/blob/fb3bd1fc/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedHaServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedHaServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedHaServices.java
index 76eb681..4c30f87 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedHaServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedHaServices.java
@@ -45,11 +45,14 @@ public class EmbeddedHaServices extends AbstractNonHaServices {
private final EmbeddedLeaderService resourceManagerLeaderService;
+ private final EmbeddedLeaderService dispatcherLeaderService;
+
private final HashMap<JobID, EmbeddedLeaderService> jobManagerLeaderServices;
public EmbeddedHaServices(Executor executor) {
this.executor = Preconditions.checkNotNull(executor);
this.resourceManagerLeaderService = new EmbeddedLeaderService(executor);
+ this.dispatcherLeaderService = new EmbeddedLeaderService(executor);
this.jobManagerLeaderServices = new HashMap<>();
}
@@ -63,11 +66,21 @@ public class EmbeddedHaServices extends AbstractNonHaServices {
}
@Override
+ public LeaderRetrievalService getDispatcherLeaderRetriever() {
+ return dispatcherLeaderService.createLeaderRetrievalService();
+ }
+
+ @Override
public LeaderElectionService getResourceManagerLeaderElectionService() {
return resourceManagerLeaderService.createLeaderElectionService();
}
@Override
+ public LeaderElectionService getDispatcherLeaderElectionService() {
+ return dispatcherLeaderService.createLeaderElectionService();
+ }
+
+ @Override
public LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID) {
checkNotNull(jobID);
http://git-wip-us.apache.org/repos/asf/flink/blob/fb3bd1fc/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/standalone/StandaloneHaServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/standalone/StandaloneHaServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/standalone/StandaloneHaServices.java
index b3c6ee5..617b351 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/standalone/StandaloneHaServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/standalone/StandaloneHaServices.java
@@ -45,6 +45,9 @@ public class StandaloneHaServices extends AbstractNonHaServices {
/** The fix address of the ResourceManager */
private final String resourceManagerAddress;
+ /** The fix address of the Dispatcher */
+ private final String dispatcherAddress;
+
/** The fix address of the JobManager */
private final String jobManagerAddress;
@@ -53,8 +56,12 @@ public class StandaloneHaServices extends AbstractNonHaServices {
*
* @param resourceManagerAddress The fix address of the ResourceManager
*/
- public StandaloneHaServices(String resourceManagerAddress, String jobManagerAddress) {
+ public StandaloneHaServices(
+ String resourceManagerAddress,
+ String dispatcherAddress,
+ String jobManagerAddress) {
this.resourceManagerAddress = checkNotNull(resourceManagerAddress, "resourceManagerAddress");
+ this.dispatcherAddress = checkNotNull(dispatcherAddress, "dispatcherAddress");
this.jobManagerAddress = checkNotNull(jobManagerAddress, "jobManagerAddress");
}
@@ -73,6 +80,15 @@ public class StandaloneHaServices extends AbstractNonHaServices {
}
@Override
+ public LeaderRetrievalService getDispatcherLeaderRetriever() {
+ synchronized (lock) {
+ checkNotShutdown();
+
+ return new StandaloneLeaderRetrievalService(dispatcherAddress, DEFAULT_LEADER_ID);
+ }
+ }
+
+ @Override
public LeaderElectionService getResourceManagerLeaderElectionService() {
synchronized (lock) {
checkNotShutdown();
@@ -82,6 +98,15 @@ public class StandaloneHaServices extends AbstractNonHaServices {
}
@Override
+ public LeaderElectionService getDispatcherLeaderElectionService() {
+ synchronized (lock) {
+ checkNotShutdown();
+
+ return new StandaloneLeaderElectionService();
+ }
+ }
+
+ @Override
public LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID) {
synchronized (lock) {
checkNotShutdown();
http://git-wip-us.apache.org/repos/asf/flink/blob/fb3bd1fc/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServices.java
index 9dabfa2..04ab6d3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServices.java
@@ -82,6 +82,8 @@ public class ZooKeeperHaServices implements HighAvailabilityServices {
private static final String RESOURCE_MANAGER_LEADER_PATH = "/resource_manager_lock";
+ private static final String DISPATCHER_LEADER_PATH = "/dispatcher_lock";
+
private static final String JOB_MANAGER_LEADER_PATH = "/job_manager_lock";
// ------------------------------------------------------------------------
@@ -125,6 +127,11 @@ public class ZooKeeperHaServices implements HighAvailabilityServices {
}
@Override
+ public LeaderRetrievalService getDispatcherLeaderRetriever() {
+ return ZooKeeperUtils.createLeaderRetrievalService(client, configuration, DISPATCHER_LEADER_PATH);
+ }
+
+ @Override
public LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID) {
return ZooKeeperUtils.createLeaderRetrievalService(client, configuration, getPathForJobManager(jobID));
}
@@ -140,6 +147,11 @@ public class ZooKeeperHaServices implements HighAvailabilityServices {
}
@Override
+ public LeaderElectionService getDispatcherLeaderElectionService() {
+ return ZooKeeperUtils.createLeaderElectionService(client, configuration, DISPATCHER_LEADER_PATH);
+ }
+
+ @Override
public LeaderElectionService getJobManagerLeaderElectionService(JobID jobID) {
return ZooKeeperUtils.createLeaderElectionService(client, configuration, getPathForJobManager(jobID));
}
http://git-wip-us.apache.org/repos/asf/flink/blob/fb3bd1fc/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcUtils.java
index 9738970..a644efd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcUtils.java
@@ -18,13 +18,21 @@
package org.apache.flink.runtime.rpc;
+import org.apache.flink.api.common.time.Time;
+
import java.util.HashSet;
import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
/**
* Utility functions for Flink's RPC implementation
*/
public class RpcUtils {
+
+ public static final Time INF_TIMEOUT = Time.milliseconds(Long.MAX_VALUE);
+
/**
* Extracts all {@link RpcGateway} interfaces implemented by the given clazz.
*
@@ -47,6 +55,20 @@ public class RpcUtils {
return interfaces;
}
+ /**
+ * Shuts the given {@link RpcEndpoint} down and awaits its termination.
+ *
+ * @param rpcEndpoint to terminate
+ * @param timeout for this operation
+ * @throws ExecutionException if a problem occurs
+ * @throws InterruptedException if the operation has been interrupted
+ * @throws TimeoutException if a timeout occurred
+ */
+ public static void terminateRpcEndpoint(RpcEndpoint rpcEndpoint, Time timeout) throws ExecutionException, InterruptedException, TimeoutException {
+ rpcEndpoint.shutDown();
+ rpcEndpoint.getTerminationFuture().get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
+ }
+
// We don't want this class to be instantiable
private RpcUtils() {}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/fb3bd1fc/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
index 4237327..091608c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
@@ -25,24 +25,33 @@ import org.apache.flink.runtime.blob.BlobServer;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
import org.apache.flink.runtime.highavailability.nonha.standalone.StandaloneHaServices;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobmanager.OnCompletionActions;
+import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore;
import org.apache.flink.runtime.jobmaster.JobManagerRunner;
+import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.RpcUtils;
import org.apache.flink.runtime.rpc.TestingRpcService;
import org.apache.flink.runtime.util.TestingFatalErrorHandler;
import org.apache.flink.util.TestLogger;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
import org.junit.Test;
import org.mockito.Mockito;
+import java.util.UUID;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@@ -52,6 +61,23 @@ import static org.mockito.Mockito.when;
*/
public class DispatcherTest extends TestLogger {
+ private static RpcService rpcService;
+ private static final Time timeout = Time.seconds(10L);
+
+ @BeforeClass
+ public static void setup() {
+ rpcService = new TestingRpcService();
+ }
+
+ @AfterClass
+ public static void teardown() {
+ if (rpcService != null) {
+ rpcService.stopService();
+
+ rpcService = null;
+ }
+ }
+
/**
* Tests that we can submit a job to the Dispatcher which then spawns a
* new JobManagerRunner.
@@ -59,34 +85,35 @@ public class DispatcherTest extends TestLogger {
@Test
public void testJobSubmission() throws Exception {
TestingFatalErrorHandler fatalErrorHandler = new TestingFatalErrorHandler();
- RpcService rpcService = new TestingRpcService();
- HighAvailabilityServices haServices = new StandaloneHaServices("localhost", "localhost");
+ HighAvailabilityServices haServices = new StandaloneHaServices(
+ "localhost",
+ "localhost",
+ "localhost");
HeartbeatServices heartbeatServices = new HeartbeatServices(1000L, 10000L);
JobManagerRunner jobManagerRunner = mock(JobManagerRunner.class);
- final Time timeout = Time.seconds(5L);
final JobGraph jobGraph = mock(JobGraph.class);
final JobID jobId = new JobID();
when(jobGraph.getJobID()).thenReturn(jobId);
- try {
- final TestingDispatcher dispatcher = new TestingDispatcher(
- rpcService,
- Dispatcher.DISPATCHER_NAME,
- new Configuration(),
- haServices,
- mock(BlobServer.class),
- heartbeatServices,
- mock(MetricRegistry.class),
- fatalErrorHandler,
- jobManagerRunner,
- jobId);
+ final TestingDispatcher dispatcher = new TestingDispatcher(
+ rpcService,
+ Dispatcher.DISPATCHER_NAME,
+ new Configuration(),
+ haServices,
+ mock(BlobServer.class),
+ heartbeatServices,
+ mock(MetricRegistry.class),
+ fatalErrorHandler,
+ jobManagerRunner,
+ jobId);
+ try {
dispatcher.start();
DispatcherGateway dispatcherGateway = dispatcher.getSelfGateway(DispatcherGateway.class);
- CompletableFuture<Acknowledge> acknowledgeFuture = dispatcherGateway.submitJob(jobGraph, timeout);
+ CompletableFuture<Acknowledge> acknowledgeFuture = dispatcherGateway.submitJob(jobGraph, HighAvailabilityServices.DEFAULT_LEADER_ID, timeout);
acknowledgeFuture.get();
@@ -95,7 +122,60 @@ public class DispatcherTest extends TestLogger {
// check that no error has occurred
fatalErrorHandler.rethrowError();
} finally {
- rpcService.stopService();
+ RpcUtils.terminateRpcEndpoint(dispatcher, timeout);
+ }
+ }
+
+ /**
+ * Tests that the dispatcher takes part in the leader election.
+ */
+ @Test
+ public void testLeaderElection() throws Exception {
+ TestingFatalErrorHandler fatalErrorHandler = new TestingFatalErrorHandler();
+ TestingHighAvailabilityServices haServices = new TestingHighAvailabilityServices();
+
+ UUID expectedLeaderSessionId = UUID.randomUUID();
+ CompletableFuture<UUID> leaderSessionIdFuture = new CompletableFuture<>();
+ SubmittedJobGraphStore mockSubmittedJobGraphStore = mock(SubmittedJobGraphStore.class);
+ TestingLeaderElectionService testingLeaderElectionService = new TestingLeaderElectionService() {
+ @Override
+ public void confirmLeaderSessionID(UUID leaderSessionId) {
+ super.confirmLeaderSessionID(leaderSessionId);
+ leaderSessionIdFuture.complete(leaderSessionId);
+ }
+ };
+
+ haServices.setSubmittedJobGraphStore(mockSubmittedJobGraphStore);
+ haServices.setDispatcherLeaderElectionService(testingLeaderElectionService);
+ HeartbeatServices heartbeatServices = new HeartbeatServices(1000L, 1000L);
+ final JobID jobId = new JobID();
+
+ final TestingDispatcher dispatcher = new TestingDispatcher(
+ rpcService,
+ Dispatcher.DISPATCHER_NAME,
+ new Configuration(),
+ haServices,
+ mock(BlobServer.class),
+ heartbeatServices,
+ mock(MetricRegistry.class),
+ fatalErrorHandler,
+ mock(JobManagerRunner.class),
+ jobId);
+
+ try {
+ dispatcher.start();
+
+ assertFalse(leaderSessionIdFuture.isDone());
+
+ testingLeaderElectionService.isLeader(expectedLeaderSessionId);
+
+ UUID actualLeaderSessionId = leaderSessionIdFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
+
+ assertEquals(expectedLeaderSessionId, actualLeaderSessionId);
+
+ verify(mockSubmittedJobGraphStore, Mockito.timeout(timeout.toMilliseconds()).atLeast(1)).getJobIds();
+ } finally {
+ RpcUtils.terminateRpcEndpoint(dispatcher, timeout);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/fb3bd1fc/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
index 0a7e9c8..dba7bef 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
@@ -38,12 +38,16 @@ public class TestingHighAvailabilityServices implements HighAvailabilityServices
private volatile LeaderRetrievalService resourceManagerLeaderRetriever;
+ private volatile LeaderRetrievalService dispatcherLeaderRetriever;
+
private ConcurrentHashMap<JobID, LeaderRetrievalService> jobMasterLeaderRetrievers = new ConcurrentHashMap<>();
private ConcurrentHashMap<JobID, LeaderElectionService> jobManagerLeaderElectionServices = new ConcurrentHashMap<>();
private volatile LeaderElectionService resourceManagerLeaderElectionService;
+ private volatile LeaderElectionService dispatcherLeaderElectionService;
+
private volatile CheckpointRecoveryFactory checkpointRecoveryFactory;
private volatile SubmittedJobGraphStore submittedJobGraphStore;
@@ -56,6 +60,10 @@ public class TestingHighAvailabilityServices implements HighAvailabilityServices
this.resourceManagerLeaderRetriever = resourceManagerLeaderRetriever;
}
+ public void setDispatcherLeaderRetriever(LeaderRetrievalService dispatcherLeaderRetriever) {
+ this.dispatcherLeaderRetriever = dispatcherLeaderRetriever;
+ }
+
public void setJobMasterLeaderRetriever(JobID jobID, LeaderRetrievalService jobMasterLeaderRetriever) {
this.jobMasterLeaderRetrievers.put(jobID, jobMasterLeaderRetriever);
}
@@ -68,6 +76,10 @@ public class TestingHighAvailabilityServices implements HighAvailabilityServices
this.resourceManagerLeaderElectionService = leaderElectionService;
}
+ public void setDispatcherLeaderElectionService(LeaderElectionService leaderElectionService) {
+ this.dispatcherLeaderElectionService = leaderElectionService;
+ }
+
public void setCheckpointRecoveryFactory(CheckpointRecoveryFactory checkpointRecoveryFactory) {
this.checkpointRecoveryFactory = checkpointRecoveryFactory;
}
@@ -91,6 +103,16 @@ public class TestingHighAvailabilityServices implements HighAvailabilityServices
}
@Override
+ public LeaderRetrievalService getDispatcherLeaderRetriever() {
+ LeaderRetrievalService service = this.dispatcherLeaderRetriever;
+ if (service != null) {
+ return service;
+ } else {
+ throw new IllegalStateException("ResourceManagerLeaderRetriever has not been set");
+ }
+ }
+
+ @Override
public LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID) {
LeaderRetrievalService service = this.jobMasterLeaderRetrievers.get(jobID);
if (service != null) {
@@ -117,6 +139,17 @@ public class TestingHighAvailabilityServices implements HighAvailabilityServices
}
@Override
+ public LeaderElectionService getDispatcherLeaderElectionService() {
+ LeaderElectionService service = dispatcherLeaderElectionService;
+
+ if (service != null) {
+ return service;
+ } else {
+ throw new IllegalStateException("DispatcherLeaderElectionService has not been set");
+ }
+ }
+
+ @Override
public LeaderElectionService getJobManagerLeaderElectionService(JobID jobID) {
LeaderElectionService service = this.jobManagerLeaderElectionServices.get(jobID);
http://git-wip-us.apache.org/repos/asf/flink/blob/fb3bd1fc/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingManualHighAvailabilityServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingManualHighAvailabilityServices.java b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingManualHighAvailabilityServices.java
index 0735d17..1f319eb 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingManualHighAvailabilityServices.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingManualHighAvailabilityServices.java
@@ -45,9 +45,12 @@ public class TestingManualHighAvailabilityServices implements HighAvailabilitySe
private final ManualLeaderService resourceManagerLeaderService;
+ private final ManualLeaderService dispatcherLeaderService;
+
public TestingManualHighAvailabilityServices() {
jobManagerLeaderServices = new HashMap<>(4);
resourceManagerLeaderService = new ManualLeaderService();
+ dispatcherLeaderService = new ManualLeaderService();
}
@Override
@@ -56,6 +59,11 @@ public class TestingManualHighAvailabilityServices implements HighAvailabilitySe
}
@Override
+ public LeaderRetrievalService getDispatcherLeaderRetriever() {
+ return dispatcherLeaderService.createLeaderRetrievalService();
+ }
+
+ @Override
public LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID) {
ManualLeaderService leaderService = getOrCreateJobManagerLeaderService(jobID);
@@ -73,6 +81,11 @@ public class TestingManualHighAvailabilityServices implements HighAvailabilitySe
}
@Override
+ public LeaderElectionService getDispatcherLeaderElectionService() {
+ return dispatcherLeaderService.createLeaderElectionService();
+ }
+
+ @Override
public LeaderElectionService getJobManagerLeaderElectionService(JobID jobID) {
ManualLeaderService leaderService = getOrCreateJobManagerLeaderService(jobID);
http://git-wip-us.apache.org/repos/asf/flink/blob/fb3bd1fc/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/nonha/standalone/StandaloneHaServicesTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/nonha/standalone/StandaloneHaServicesTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/nonha/standalone/StandaloneHaServicesTest.java
index 2d51360..1cf2e5b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/nonha/standalone/StandaloneHaServicesTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/nonha/standalone/StandaloneHaServicesTest.java
@@ -39,6 +39,7 @@ import static org.mockito.Mockito.verify;
public class StandaloneHaServicesTest extends TestLogger {
private final String jobManagerAddress = "jobManager";
+ private final String dispatcherAddress = "dispatcher";
private final String resourceManagerAddress = "resourceManager";
private StandaloneHaServices standaloneHaServices;
@@ -46,7 +47,10 @@ public class StandaloneHaServicesTest extends TestLogger {
@Before
public void setupTest() {
- standaloneHaServices = new StandaloneHaServices(resourceManagerAddress, jobManagerAddress);
+ standaloneHaServices = new StandaloneHaServices(
+ resourceManagerAddress,
+ dispatcherAddress,
+ jobManagerAddress);
}
@After
http://git-wip-us.apache.org/repos/asf/flink/blob/fb3bd1fc/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
index 6842bee..e622130 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
@@ -462,6 +462,7 @@ public class TaskExecutorTest extends TestLogger {
final ResourceID resourceID = ResourceID.generate();
final String resourceManagerAddress = "/resource/manager/address/one";
final ResourceID resourceManagerResourceId = new ResourceID(resourceManagerAddress);
+ final String dispatcherAddress = "localhost";
final String jobManagerAddress = "localhost";
final TestingRpcService rpc = new TestingRpcService();
@@ -483,6 +484,7 @@ public class TaskExecutorTest extends TestLogger {
StandaloneHaServices haServices = new StandaloneHaServices(
resourceManagerAddress,
+ dispatcherAddress,
jobManagerAddress);
final TaskSlotTable taskSlotTable = mock(TaskSlotTable.class);
http://git-wip-us.apache.org/repos/asf/flink/blob/fb3bd1fc/flink-yarn/src/main/java/org/apache/flink/yarn/highavailability/YarnIntraNonHaMasterServices.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/highavailability/YarnIntraNonHaMasterServices.java b/flink-yarn/src/main/java/org/apache/flink/yarn/highavailability/YarnIntraNonHaMasterServices.java
index 75f8c0a..86db1c4 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/highavailability/YarnIntraNonHaMasterServices.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/highavailability/YarnIntraNonHaMasterServices.java
@@ -71,6 +71,9 @@ public class YarnIntraNonHaMasterServices extends AbstractYarnNonHaServices {
/** The embedded leader election service used by JobManagers to find the resource manager. */
private final SingleLeaderElectionService resourceManagerLeaderElectionService;
+ /** The embedded leader election service for the dispatcher. */
+ private final SingleLeaderElectionService dispatcherLeaderElectionService;
+
// ------------------------------------------------------------------------
/**
@@ -100,6 +103,7 @@ public class YarnIntraNonHaMasterServices extends AbstractYarnNonHaServices {
try {
this.dispatcher = Executors.newSingleThreadExecutor(new ServicesThreadFactory());
this.resourceManagerLeaderElectionService = new SingleLeaderElectionService(dispatcher, DEFAULT_LEADER_ID);
+ this.dispatcherLeaderElectionService = new SingleLeaderElectionService(dispatcher, DEFAULT_LEADER_ID);
// all good!
successful = true;
@@ -130,6 +134,17 @@ public class YarnIntraNonHaMasterServices extends AbstractYarnNonHaServices {
}
@Override
+ public LeaderRetrievalService getDispatcherLeaderRetriever() {
+ enter();
+
+ try {
+ return dispatcherLeaderElectionService.createLeaderRetrievalService();
+ } finally {
+ exit();
+ }
+ }
+
+ @Override
public LeaderElectionService getResourceManagerLeaderElectionService() {
enter();
try {
@@ -141,6 +156,16 @@ public class YarnIntraNonHaMasterServices extends AbstractYarnNonHaServices {
}
@Override
+ public LeaderElectionService getDispatcherLeaderElectionService() {
+ enter();
+ try {
+ return dispatcherLeaderElectionService;
+ } finally {
+ exit();
+ }
+ }
+
+ @Override
public LeaderElectionService getJobManagerLeaderElectionService(JobID jobID) {
enter();
try {
http://git-wip-us.apache.org/repos/asf/flink/blob/fb3bd1fc/flink-yarn/src/main/java/org/apache/flink/yarn/highavailability/YarnPreConfiguredMasterNonHaServices.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/highavailability/YarnPreConfiguredMasterNonHaServices.java b/flink-yarn/src/main/java/org/apache/flink/yarn/highavailability/YarnPreConfiguredMasterNonHaServices.java
index 6686a52..c1466d2 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/highavailability/YarnPreConfiguredMasterNonHaServices.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/highavailability/YarnPreConfiguredMasterNonHaServices.java
@@ -21,6 +21,7 @@ package org.apache.flink.yarn.highavailability;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.runtime.dispatcher.Dispatcher;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
import org.apache.flink.runtime.leaderelection.LeaderElectionService;
@@ -66,6 +67,9 @@ public class YarnPreConfiguredMasterNonHaServices extends AbstractYarnNonHaServi
/** The RPC URL under which the single ResourceManager can be reached while available. */
private final String resourceManagerRpcUrl;
+ /** The RPC URL under which the single Dispatcher can be reached while available. */
+ private final String dispatcherRpcUrl;
+
// ------------------------------------------------------------------------
/**
@@ -116,6 +120,13 @@ public class YarnPreConfiguredMasterNonHaServices extends AbstractYarnNonHaServi
addressResolution,
config);
+ this.dispatcherRpcUrl = AkkaRpcServiceUtils.getRpcUrl(
+ rmHost,
+ rmPort,
+ Dispatcher.DISPATCHER_NAME,
+ addressResolution,
+ config);
+
// all well!
successful = true;
}
@@ -145,6 +156,17 @@ public class YarnPreConfiguredMasterNonHaServices extends AbstractYarnNonHaServi
}
@Override
+ public LeaderRetrievalService getDispatcherLeaderRetriever() {
+ enter();
+
+ try {
+ return new StandaloneLeaderRetrievalService(dispatcherRpcUrl, DEFAULT_LEADER_ID);
+ } finally {
+ exit();
+ }
+ }
+
+ @Override
public LeaderElectionService getResourceManagerLeaderElectionService() {
enter();
try {
@@ -156,6 +178,16 @@ public class YarnPreConfiguredMasterNonHaServices extends AbstractYarnNonHaServi
}
@Override
+ public LeaderElectionService getDispatcherLeaderElectionService() {
+ enter();
+ try {
+ throw new UnsupportedOperationException("Not supported on the TaskManager side");
+ } finally {
+ exit();
+ }
+ }
+
+ @Override
public LeaderElectionService getJobManagerLeaderElectionService(JobID jobID) {
enter();
try {