You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2016/10/06 11:48:22 UTC
[02/50] [abbrv] flink git commit: [FLINK-4400] [cluster mngmt]
Implement leadership election among JobMasters
[FLINK-4400] [cluster mngmt] Implement leadership election among JobMasters
Adapt related components to the changes in HighAvailabilityServices
Add comments for getJobMasterElectionService in HighAvailabilityServices
This closes #2377.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4cb3d157
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4cb3d157
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4cb3d157
Branch: refs/heads/flip-6
Commit: 4cb3d1572a2473c15ef6ae0ef9dc07d8cd23c9fe
Parents: 2d50cc0
Author: xiaogang.sxg <xi...@alibaba-inc.com>
Authored: Wed Aug 17 13:46:00 2016 +0800
Committer: Till Rohrmann <tr...@apache.org>
Committed: Thu Oct 6 13:38:38 2016 +0200
----------------------------------------------------------------------
.../HighAvailabilityServices.java | 9 +
.../runtime/highavailability/NonHaServices.java | 8 +
.../flink/runtime/rpc/jobmaster/JobMaster.java | 318 +++++++++----------
.../runtime/rpc/akka/AkkaRpcServiceTest.java | 53 +---
4 files changed, 179 insertions(+), 209 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/4cb3d157/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java
index 094d36f..73e4f1f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java
@@ -18,6 +18,8 @@
package org.apache.flink.runtime.highavailability;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.leaderelection.LeaderElectionService;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
/**
@@ -36,4 +38,11 @@ public interface HighAvailabilityServices {
* Gets the leader retriever for the cluster's resource manager.
*/
LeaderRetrievalService getResourceManagerLeaderRetriever() throws Exception;
+
+ /**
+ * Gets the leader election service for the given job.
+ *
+ * @param jobID The identifier of the job running the election.
+ */
+ LeaderElectionService getJobMasterLeaderElectionService(JobID jobID) throws Exception;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/4cb3d157/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java
index b8c2ed8..3d2769b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java
@@ -18,6 +18,9 @@
package org.apache.flink.runtime.highavailability;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.leaderelection.LeaderElectionService;
+import org.apache.flink.runtime.leaderelection.StandaloneLeaderElectionService;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.leaderretrieval.StandaloneLeaderRetrievalService;
@@ -56,4 +59,9 @@ public class NonHaServices implements HighAvailabilityServices {
public LeaderRetrievalService getResourceManagerLeaderRetriever() throws Exception {
return new StandaloneLeaderRetrievalService(resourceManagerAddress, new UUID(0, 0));
}
+
+ @Override
+ public LeaderElectionService getJobMasterLeaderElectionService(JobID jobID) throws Exception {
+ return new StandaloneLeaderElectionService();
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/4cb3d157/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/jobmaster/JobMaster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/jobmaster/JobMaster.java
index e53cd68..49b200b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/jobmaster/JobMaster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/jobmaster/JobMaster.java
@@ -18,68 +18,77 @@
package org.apache.flink.runtime.rpc.jobmaster;
-import akka.dispatch.Futures;
-import akka.dispatch.Mapper;
-import akka.dispatch.OnComplete;
-import org.apache.flink.runtime.instance.InstanceID;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobmanager.RecoveryMode;
+import org.apache.flink.runtime.leaderelection.LeaderContender;
+import org.apache.flink.runtime.leaderelection.LeaderElectionService;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.rpc.RpcMethod;
-import org.apache.flink.runtime.rpc.resourcemanager.JobMasterRegistration;
-import org.apache.flink.runtime.rpc.resourcemanager.RegistrationResponse;
import org.apache.flink.runtime.rpc.resourcemanager.ResourceManagerGateway;
import org.apache.flink.runtime.rpc.RpcEndpoint;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
import org.apache.flink.util.Preconditions;
-import scala.Tuple2;
-import scala.concurrent.ExecutionContext;
-import scala.concurrent.ExecutionContext$;
-import scala.concurrent.Future;
-import scala.concurrent.duration.Deadline;
-import scala.concurrent.duration.FiniteDuration;
import java.util.UUID;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
/**
* JobMaster implementation. The job master is responsible for the execution of a single
* {@link org.apache.flink.runtime.jobgraph.JobGraph}.
- *
+ * <p>
* It offers the following methods as part of its rpc interface to interact with the JobMaster
* remotely:
* <ul>
- * <li>{@link #registerAtResourceManager(String)} triggers the registration at the resource manager</li>
* <li>{@link #updateTaskExecutionState(TaskExecutionState)} updates the task execution state for
* given task</li>
* </ul>
*/
public class JobMaster extends RpcEndpoint<JobMasterGateway> {
- /** Execution context for future callbacks */
- private final ExecutionContext executionContext;
-
- /** Execution context for scheduled runnables */
- private final ScheduledExecutorService scheduledExecutorService;
-
- private final FiniteDuration initialRegistrationTimeout = new FiniteDuration(500, TimeUnit.MILLISECONDS);
- private final FiniteDuration maxRegistrationTimeout = new FiniteDuration(30, TimeUnit.SECONDS);
- private final FiniteDuration registrationDuration = new FiniteDuration(365, TimeUnit.DAYS);
- private final long failedRegistrationDelay = 10000;
/** Gateway to connected resource manager, null iff not connected */
private ResourceManagerGateway resourceManager = null;
- /** UUID to filter out old registration runs */
- private UUID currentRegistrationRun;
+ /** Logical representation of the job */
+ private final JobGraph jobGraph;
+ private final JobID jobID;
+
+ /** Configuration of the job */
+ private final Configuration configuration;
+ private final RecoveryMode recoveryMode;
+
+ /** Service to contend for and retrieve the leadership of JM and RM */
+ private final HighAvailabilityServices highAvailabilityServices;
+
+ /** Leader Management */
+ private LeaderElectionService leaderElectionService = null;
+ private UUID leaderSessionID;
+
+ /**
+ * The JM's Constructor
+ *
+ * @param jobGraph The representation of the job's execution plan
+ * @param configuration The job's configuration
+ * @param rpcService The RPC service at which the JM serves
+ * @param highAvailabilityService The cluster's HA service from the JM can elect and retrieve leaders.
+ */
+ public JobMaster(
+ JobGraph jobGraph,
+ Configuration configuration,
+ RpcService rpcService,
+ HighAvailabilityServices highAvailabilityService) {
- public JobMaster(RpcService rpcService, ExecutorService executorService) {
super(rpcService);
- executionContext = ExecutionContext$.MODULE$.fromExecutor(
- Preconditions.checkNotNull(executorService));
- scheduledExecutorService = new ScheduledThreadPoolExecutor(1);
+
+ this.jobGraph = Preconditions.checkNotNull(jobGraph);
+ this.jobID = Preconditions.checkNotNull(jobGraph.getJobID());
+
+ this.configuration = Preconditions.checkNotNull(configuration);
+ this.recoveryMode = RecoveryMode.fromConfig(configuration);
+
+ this.highAvailabilityServices = Preconditions.checkNotNull(highAvailabilityService);
}
public ResourceManagerGateway getResourceManager() {
@@ -87,6 +96,91 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
}
//----------------------------------------------------------------------------------------------
+ // Initialization methods
+ //----------------------------------------------------------------------------------------------
+ public void start() {
+ super.start();
+
+ // register at the election once the JM starts
+ registerAtElectionService();
+ }
+
+
+ //----------------------------------------------------------------------------------------------
+ // JobMaster Leadership methods
+ //----------------------------------------------------------------------------------------------
+
+ /**
+ * Retrieves the election service and contend for the leadership.
+ */
+ private void registerAtElectionService() {
+ try {
+ leaderElectionService = highAvailabilityServices.getJobMasterLeaderElectionService(jobID);
+ leaderElectionService.start(new JobMasterLeaderContender());
+ } catch (Exception e) {
+ throw new RuntimeException("Fail to register at the election of JobMaster", e);
+ }
+ }
+
+ /**
+ * Start the execution when the leadership is granted.
+ *
+ * @param newLeaderSessionID The identifier of the new leadership session
+ */
+ public void grantJobMasterLeadership(final UUID newLeaderSessionID) {
+ runAsync(new Runnable() {
+ @Override
+ public void run() {
+ log.info("JobManager {} grants leadership with session id {}.", getAddress(), newLeaderSessionID);
+
+ // The operation may be blocking, but since JM is idle before it grants the leadership, it's okay that
+ // JM waits here for the operation's completeness.
+ leaderSessionID = newLeaderSessionID;
+ leaderElectionService.confirmLeaderSessionID(newLeaderSessionID);
+
+ // TODO:: execute the job when the leadership is granted.
+ }
+ });
+ }
+
+ /**
+ * Stop the execution when the leadership is revoked.
+ */
+ public void revokeJobMasterLeadership() {
+ runAsync(new Runnable() {
+ @Override
+ public void run() {
+ log.info("JobManager {} was revoked leadership.", getAddress());
+
+ // TODO:: cancel the job's execution and notify all listeners
+ cancelAndClearEverything(new Exception("JobManager is no longer the leader."));
+
+ leaderSessionID = null;
+ }
+ });
+ }
+
+ /**
+ * Handles error occurring in the leader election service
+ *
+ * @param exception Exception thrown in the leader election service
+ */
+ public void onJobMasterElectionError(final Exception exception) {
+ runAsync(new Runnable() {
+ @Override
+ public void run() {
+ log.error("Received an error from the LeaderElectionService.", exception);
+
+ // TODO:: cancel the job's execution and shutdown the JM
+ cancelAndClearEverything(exception);
+
+ leaderSessionID = null;
+ }
+ });
+
+ }
+
+ //----------------------------------------------------------------------------------------------
// RPC methods
//----------------------------------------------------------------------------------------------
@@ -109,18 +203,7 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
*/
@RpcMethod
public void registerAtResourceManager(final String address) {
- currentRegistrationRun = UUID.randomUUID();
-
- Future<ResourceManagerGateway> resourceManagerFuture = getRpcService().connect(address, ResourceManagerGateway.class);
-
- handleResourceManagerRegistration(
- new JobMasterRegistration(getAddress()),
- 1,
- resourceManagerFuture,
- currentRegistrationRun,
- initialRegistrationTimeout,
- maxRegistrationTimeout,
- registrationDuration.fromNow());
+ //TODO:: register at the RM
}
//----------------------------------------------------------------------------------------------
@@ -128,124 +211,37 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
//----------------------------------------------------------------------------------------------
/**
- * Helper method to handle the resource manager registration process. If a registration attempt
- * times out, then a new attempt with the doubled time out is initiated. The whole registration
- * process has a deadline. Once this deadline is overdue without successful registration, the
- * job master shuts down.
+ * Cancel the current job and notify all listeners the job's cancellation.
*
- * @param jobMasterRegistration Job master registration info which is sent to the resource
- * manager
- * @param attemptNumber Registration attempt number
- * @param resourceManagerFuture Future of the resource manager gateway
- * @param registrationRun UUID describing the current registration run
- * @param timeout Timeout of the last registration attempt
- * @param maxTimeout Maximum timeout between registration attempts
- * @param deadline Deadline for the registration
+ * @param cause Cause for the cancelling.
*/
- void handleResourceManagerRegistration(
- final JobMasterRegistration jobMasterRegistration,
- final int attemptNumber,
- final Future<ResourceManagerGateway> resourceManagerFuture,
- final UUID registrationRun,
- final FiniteDuration timeout,
- final FiniteDuration maxTimeout,
- final Deadline deadline) {
-
- // filter out concurrent registration runs
- if (registrationRun.equals(currentRegistrationRun)) {
-
- log.info("Start registration attempt #{}.", attemptNumber);
-
- if (deadline.isOverdue()) {
- // we've exceeded our registration deadline. This means that we have to shutdown the JobMaster
- log.error("Exceeded registration deadline without successfully registering at the ResourceManager.");
- shutDown();
- } else {
- Future<Tuple2<RegistrationResponse, ResourceManagerGateway>> registrationResponseFuture = resourceManagerFuture.flatMap(new Mapper<ResourceManagerGateway, Future<Tuple2<RegistrationResponse, ResourceManagerGateway>>>() {
- @Override
- public Future<Tuple2<RegistrationResponse, ResourceManagerGateway>> apply(ResourceManagerGateway resourceManagerGateway) {
- return resourceManagerGateway.registerJobMaster(jobMasterRegistration, timeout).zip(Futures.successful(resourceManagerGateway));
- }
- }, executionContext);
-
- registrationResponseFuture.onComplete(new OnComplete<Tuple2<RegistrationResponse, ResourceManagerGateway>>() {
- @Override
- public void onComplete(Throwable failure, Tuple2<RegistrationResponse, ResourceManagerGateway> tuple) throws Throwable {
- if (failure != null) {
- if (failure instanceof TimeoutException) {
- // we haven't received an answer in the given timeout interval,
- // so increase it and try again.
- final FiniteDuration newTimeout = timeout.$times(2L).min(maxTimeout);
-
- handleResourceManagerRegistration(
- jobMasterRegistration,
- attemptNumber + 1,
- resourceManagerFuture,
- registrationRun,
- newTimeout,
- maxTimeout,
- deadline);
- } else {
- log.error("Received unknown error while registering at the ResourceManager.", failure);
- shutDown();
- }
- } else {
- final RegistrationResponse response = tuple._1();
- final ResourceManagerGateway gateway = tuple._2();
-
- if (response.isSuccess()) {
- finishResourceManagerRegistration(gateway, response.getInstanceID());
- } else {
- log.info("The registration was refused. Try again.");
-
- scheduledExecutorService.schedule(new Runnable() {
- @Override
- public void run() {
- // we have to execute scheduled runnable in the main thread
- // because we need consistency wrt currentRegistrationRun
- runAsync(new Runnable() {
- @Override
- public void run() {
- // our registration attempt was refused. Start over.
- handleResourceManagerRegistration(
- jobMasterRegistration,
- 1,
- resourceManagerFuture,
- registrationRun,
- initialRegistrationTimeout,
- maxTimeout,
- deadline);
- }
- });
- }
- }, failedRegistrationDelay, TimeUnit.MILLISECONDS);
- }
- }
- }
- }, getMainThreadExecutionContext()); // use the main thread execution context to execute the call back in the main thread
- }
- } else {
- log.info("Discard out-dated registration run.");
- }
+ private void cancelAndClearEverything(Throwable cause) {
+ // currently, nothing to do here
}
- /**
- * Finish the resource manager registration by setting the new resource manager gateway.
- *
- * @param resourceManager New resource manager gateway
- * @param instanceID Instance id assigned by the resource manager
- */
- void finishResourceManagerRegistration(ResourceManagerGateway resourceManager, InstanceID instanceID) {
- log.info("Successfully registered at the ResourceManager under instance id {}.", instanceID);
- this.resourceManager = resourceManager;
- }
+ // ------------------------------------------------------------------------
+ // Utility classes
+ // ------------------------------------------------------------------------
+ private class JobMasterLeaderContender implements LeaderContender {
- /**
- * Return if the job master is connected to a resource manager.
- *
- * @return true if the job master is connected to the resource manager
- */
- public boolean isConnected() {
- return resourceManager != null;
+ @Override
+ public void grantLeadership(UUID leaderSessionID) {
+ JobMaster.this.grantJobMasterLeadership(leaderSessionID);
+ }
+
+ @Override
+ public void revokeLeadership() {
+ JobMaster.this.revokeJobMasterLeadership();
+ }
+
+ @Override
+ public String getAddress() {
+ return JobMaster.this.getAddress();
+ }
+
+ @Override
+ public void handleError(Exception exception) {
+ onJobMasterElectionError(exception);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/4cb3d157/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
index 7b4ab89..2790cf8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
@@ -20,9 +20,12 @@ package org.apache.flink.runtime.rpc.akka;
import akka.actor.ActorSystem;
import akka.util.Timeout;
-
import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.highavailability.NonHaServices;
+import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.rpc.jobmaster.JobMaster;
import org.apache.flink.runtime.rpc.resourcemanager.ResourceManagerGateway;
import org.apache.flink.runtime.rpc.resourcemanager.ResourceManager;
@@ -31,6 +34,7 @@ import org.apache.flink.util.TestLogger;
import org.junit.AfterClass;
import org.junit.Test;
+import org.mockito.Mockito;
import scala.concurrent.duration.Deadline;
import scala.concurrent.duration.FiniteDuration;
@@ -80,51 +84,4 @@ public class AkkaRpcServiceTest extends TestLogger {
assertTrue("call was not properly delayed", ((stop - start) / 1000000) >= delay);
}
-
- // ------------------------------------------------------------------------
- // specific component tests - should be moved to the test classes
- // for those components
- // ------------------------------------------------------------------------
-
- /**
- * Tests that the {@link JobMaster} can connect to the {@link ResourceManager} using the
- * {@link AkkaRpcService}.
- */
- @Test
- public void testJobMasterResourceManagerRegistration() throws Exception {
- Timeout akkaTimeout = new Timeout(10, TimeUnit.SECONDS);
- ActorSystem actorSystem = AkkaUtils.createDefaultActorSystem();
- ActorSystem actorSystem2 = AkkaUtils.createDefaultActorSystem();
- AkkaRpcService akkaRpcService = new AkkaRpcService(actorSystem, akkaTimeout);
- AkkaRpcService akkaRpcService2 = new AkkaRpcService(actorSystem2, akkaTimeout);
- ExecutorService executorService = new ForkJoinPool();
-
- ResourceManager resourceManager = new ResourceManager(akkaRpcService, executorService);
- JobMaster jobMaster = new JobMaster(akkaRpcService2, executorService);
-
- resourceManager.start();
- jobMaster.start();
-
- ResourceManagerGateway rm = resourceManager.getSelf();
-
- assertTrue(rm instanceof AkkaGateway);
-
- AkkaGateway akkaClient = (AkkaGateway) rm;
-
-
- jobMaster.registerAtResourceManager(AkkaUtils.getAkkaURL(actorSystem, akkaClient.getRpcEndpoint()));
-
- // wait for successful registration
- FiniteDuration timeout = new FiniteDuration(200, TimeUnit.SECONDS);
- Deadline deadline = timeout.fromNow();
-
- while (deadline.hasTimeLeft() && !jobMaster.isConnected()) {
- Thread.sleep(100);
- }
-
- assertFalse(deadline.isOverdue());
-
- jobMaster.shutDown();
- resourceManager.shutDown();
- }
}