You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2016/10/06 11:48:22 UTC

[02/50] [abbrv] flink git commit: [FLINK-4400] [cluster mngmt] Implement leadership election among JobMasters

[FLINK-4400] [cluster mngmt] Implement leadership election among JobMasters

Adapt related components to the changes in HighAvailabilityServices

Add comments for getJobMasterElectionService in HighAvailabilityServices

This closes #2377.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4cb3d157
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4cb3d157
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4cb3d157

Branch: refs/heads/flip-6
Commit: 4cb3d1572a2473c15ef6ae0ef9dc07d8cd23c9fe
Parents: 2d50cc0
Author: xiaogang.sxg <xi...@alibaba-inc.com>
Authored: Wed Aug 17 13:46:00 2016 +0800
Committer: Till Rohrmann <tr...@apache.org>
Committed: Thu Oct 6 13:38:38 2016 +0200

----------------------------------------------------------------------
 .../HighAvailabilityServices.java               |   9 +
 .../runtime/highavailability/NonHaServices.java |   8 +
 .../flink/runtime/rpc/jobmaster/JobMaster.java  | 318 +++++++++----------
 .../runtime/rpc/akka/AkkaRpcServiceTest.java    |  53 +---
 4 files changed, 179 insertions(+), 209 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/4cb3d157/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java
index 094d36f..73e4f1f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.runtime.highavailability;
 
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.leaderelection.LeaderElectionService;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 
 /**
@@ -36,4 +38,11 @@ public interface HighAvailabilityServices {
 	 * Gets the leader retriever for the cluster's resource manager.
 	 */
 	LeaderRetrievalService getResourceManagerLeaderRetriever() throws Exception;
+
+	/**
+	 * Gets the leader election service for the given job.
+	 *
+	 * @param jobID The identifier of the job running the election.
+	 */
+	LeaderElectionService getJobMasterLeaderElectionService(JobID jobID) throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb3d157/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java
index b8c2ed8..3d2769b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java
@@ -18,6 +18,9 @@
 
 package org.apache.flink.runtime.highavailability;
 
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.leaderelection.LeaderElectionService;
+import org.apache.flink.runtime.leaderelection.StandaloneLeaderElectionService;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.leaderretrieval.StandaloneLeaderRetrievalService;
 
@@ -56,4 +59,9 @@ public class NonHaServices implements HighAvailabilityServices {
 	public LeaderRetrievalService getResourceManagerLeaderRetriever() throws Exception {
 		return new StandaloneLeaderRetrievalService(resourceManagerAddress, new UUID(0, 0));
 	}
+
+	@Override
+	public LeaderElectionService getJobMasterLeaderElectionService(JobID jobID) throws Exception {
+		return new StandaloneLeaderElectionService();
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb3d157/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/jobmaster/JobMaster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/jobmaster/JobMaster.java
index e53cd68..49b200b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/jobmaster/JobMaster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/jobmaster/JobMaster.java
@@ -18,68 +18,77 @@
 
 package org.apache.flink.runtime.rpc.jobmaster;
 
-import akka.dispatch.Futures;
-import akka.dispatch.Mapper;
-import akka.dispatch.OnComplete;
-import org.apache.flink.runtime.instance.InstanceID;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobmanager.RecoveryMode;
+import org.apache.flink.runtime.leaderelection.LeaderContender;
+import org.apache.flink.runtime.leaderelection.LeaderElectionService;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.rpc.RpcMethod;
-import org.apache.flink.runtime.rpc.resourcemanager.JobMasterRegistration;
-import org.apache.flink.runtime.rpc.resourcemanager.RegistrationResponse;
 import org.apache.flink.runtime.rpc.resourcemanager.ResourceManagerGateway;
 import org.apache.flink.runtime.rpc.RpcEndpoint;
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
 import org.apache.flink.util.Preconditions;
-import scala.Tuple2;
-import scala.concurrent.ExecutionContext;
-import scala.concurrent.ExecutionContext$;
-import scala.concurrent.Future;
-import scala.concurrent.duration.Deadline;
-import scala.concurrent.duration.FiniteDuration;
 
 import java.util.UUID;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
 
 /**
  * JobMaster implementation. The job master is responsible for the execution of a single
  * {@link org.apache.flink.runtime.jobgraph.JobGraph}.
- *
+ * <p>
  * It offers the following methods as part of its rpc interface to interact with the JobMaster
  * remotely:
  * <ul>
- *     <li>{@link #registerAtResourceManager(String)} triggers the registration at the resource manager</li>
  *     <li>{@link #updateTaskExecutionState(TaskExecutionState)} updates the task execution state for
  * given task</li>
  * </ul>
  */
 public class JobMaster extends RpcEndpoint<JobMasterGateway> {
-	/** Execution context for future callbacks */
-	private final ExecutionContext executionContext;
-
-	/** Execution context for scheduled runnables */
-	private final ScheduledExecutorService scheduledExecutorService;
-
-	private final FiniteDuration initialRegistrationTimeout = new FiniteDuration(500, TimeUnit.MILLISECONDS);
-	private final FiniteDuration maxRegistrationTimeout = new FiniteDuration(30, TimeUnit.SECONDS);
-	private final FiniteDuration registrationDuration = new FiniteDuration(365, TimeUnit.DAYS);
-	private final long failedRegistrationDelay = 10000;
 
 	/** Gateway to connected resource manager, null iff not connected */
 	private ResourceManagerGateway resourceManager = null;
 
-	/** UUID to filter out old registration runs */
-	private UUID currentRegistrationRun;
+	/** Logical representation of the job */
+	private final JobGraph jobGraph;
+	private final JobID jobID;
+
+	/** Configuration of the job */
+	private final Configuration configuration;
+	private final RecoveryMode recoveryMode;
+
+	/** Service to contend for and retrieve the leadership of JM and RM */
+	private final HighAvailabilityServices highAvailabilityServices;
+
+	/** Leader Management */
+	private LeaderElectionService leaderElectionService = null;
+	private UUID leaderSessionID;
+
+	/**
+	 * The JM's Constructor
+	 *
+	 * @param jobGraph The representation of the job's execution plan
+	 * @param configuration The job's configuration
+	 * @param rpcService The RPC service at which the JM serves
+	 * @param highAvailabilityService The cluster's HA service from the JM can elect and retrieve leaders.
+	 */
+	public JobMaster(
+		JobGraph jobGraph,
+		Configuration configuration,
+		RpcService rpcService,
+		HighAvailabilityServices highAvailabilityService) {
 
-	public JobMaster(RpcService rpcService, ExecutorService executorService) {
 		super(rpcService);
-		executionContext = ExecutionContext$.MODULE$.fromExecutor(
-			Preconditions.checkNotNull(executorService));
-		scheduledExecutorService = new ScheduledThreadPoolExecutor(1);
+
+		this.jobGraph = Preconditions.checkNotNull(jobGraph);
+		this.jobID = Preconditions.checkNotNull(jobGraph.getJobID());
+
+		this.configuration = Preconditions.checkNotNull(configuration);
+		this.recoveryMode = RecoveryMode.fromConfig(configuration);
+
+		this.highAvailabilityServices = Preconditions.checkNotNull(highAvailabilityService);
 	}
 
 	public ResourceManagerGateway getResourceManager() {
@@ -87,6 +96,91 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 	}
 
 	//----------------------------------------------------------------------------------------------
+	// Initialization methods
+	//----------------------------------------------------------------------------------------------
+	public void start() {
+		super.start();
+
+		// register at the election once the JM starts
+		registerAtElectionService();
+	}
+
+
+	//----------------------------------------------------------------------------------------------
+	// JobMaster Leadership methods
+	//----------------------------------------------------------------------------------------------
+
+	/**
+	 * Retrieves the election service and contend for the leadership.
+	 */
+	private void registerAtElectionService() {
+		try {
+			leaderElectionService = highAvailabilityServices.getJobMasterLeaderElectionService(jobID);
+			leaderElectionService.start(new JobMasterLeaderContender());
+		} catch (Exception e) {
+			throw new RuntimeException("Fail to register at the election of JobMaster", e);
+		}
+	}
+
+	/**
+	 * Start the execution when the leadership is granted.
+	 *
+	 * @param newLeaderSessionID The identifier of the new leadership session
+	 */
+	public void grantJobMasterLeadership(final UUID newLeaderSessionID) {
+		runAsync(new Runnable() {
+			@Override
+			public void run() {
+				log.info("JobManager {} grants leadership with session id {}.", getAddress(), newLeaderSessionID);
+
+				// The operation may be blocking, but since JM is idle before it grants the leadership, it's okay that
+				// JM waits here for the operation's completeness.
+				leaderSessionID = newLeaderSessionID;
+				leaderElectionService.confirmLeaderSessionID(newLeaderSessionID);
+
+				// TODO:: execute the job when the leadership is granted.
+			}
+		});
+	}
+
+	/**
+	 * Stop the execution when the leadership is revoked.
+	 */
+	public void revokeJobMasterLeadership() {
+		runAsync(new Runnable() {
+			@Override
+			public void run() {
+				log.info("JobManager {} was revoked leadership.", getAddress());
+
+				// TODO:: cancel the job's execution and notify all listeners
+				cancelAndClearEverything(new Exception("JobManager is no longer the leader."));
+
+				leaderSessionID = null;
+			}
+		});
+	}
+
+	/**
+	 * Handles error occurring in the leader election service
+	 *
+	 * @param exception Exception thrown in the leader election service
+	 */
+	public void onJobMasterElectionError(final Exception exception) {
+		runAsync(new Runnable() {
+			@Override
+			public void run() {
+				log.error("Received an error from the LeaderElectionService.", exception);
+
+				// TODO:: cancel the job's execution and shutdown the JM
+				cancelAndClearEverything(exception);
+
+				leaderSessionID = null;
+			}
+		});
+
+	}
+
+	//----------------------------------------------------------------------------------------------
 	// RPC methods
 	//----------------------------------------------------------------------------------------------
 
@@ -109,18 +203,7 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 	 */
 	@RpcMethod
 	public void registerAtResourceManager(final String address) {
-		currentRegistrationRun = UUID.randomUUID();
-
-		Future<ResourceManagerGateway> resourceManagerFuture = getRpcService().connect(address, ResourceManagerGateway.class);
-
-		handleResourceManagerRegistration(
-			new JobMasterRegistration(getAddress()),
-			1,
-			resourceManagerFuture,
-			currentRegistrationRun,
-			initialRegistrationTimeout,
-			maxRegistrationTimeout,
-			registrationDuration.fromNow());
+		//TODO:: register at the RM
 	}
 
 	//----------------------------------------------------------------------------------------------
@@ -128,124 +211,37 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 	//----------------------------------------------------------------------------------------------
 
 	/**
-	 * Helper method to handle the resource manager registration process. If a registration attempt
-	 * times out, then a new attempt with the doubled time out is initiated. The whole registration
-	 * process has a deadline. Once this deadline is overdue without successful registration, the
-	 * job master shuts down.
+	 * Cancel the current job and notify all listeners the job's cancellation.
 	 *
-	 * @param jobMasterRegistration Job master registration info which is sent to the resource
-	 *                              manager
-	 * @param attemptNumber Registration attempt number
-	 * @param resourceManagerFuture Future of the resource manager gateway
-	 * @param registrationRun UUID describing the current registration run
-	 * @param timeout Timeout of the last registration attempt
-	 * @param maxTimeout Maximum timeout between registration attempts
-	 * @param deadline Deadline for the registration
+	 * @param cause Cause for the cancelling.
 	 */
-	void handleResourceManagerRegistration(
-		final JobMasterRegistration jobMasterRegistration,
-		final int attemptNumber,
-		final Future<ResourceManagerGateway> resourceManagerFuture,
-		final UUID registrationRun,
-		final FiniteDuration timeout,
-		final FiniteDuration maxTimeout,
-		final Deadline deadline) {
-
-		// filter out concurrent registration runs
-		if (registrationRun.equals(currentRegistrationRun)) {
-
-			log.info("Start registration attempt #{}.", attemptNumber);
-
-			if (deadline.isOverdue()) {
-				// we've exceeded our registration deadline. This means that we have to shutdown the JobMaster
-				log.error("Exceeded registration deadline without successfully registering at the ResourceManager.");
-				shutDown();
-			} else {
-				Future<Tuple2<RegistrationResponse, ResourceManagerGateway>> registrationResponseFuture = resourceManagerFuture.flatMap(new Mapper<ResourceManagerGateway, Future<Tuple2<RegistrationResponse, ResourceManagerGateway>>>() {
-					@Override
-					public Future<Tuple2<RegistrationResponse, ResourceManagerGateway>> apply(ResourceManagerGateway resourceManagerGateway) {
-						return resourceManagerGateway.registerJobMaster(jobMasterRegistration, timeout).zip(Futures.successful(resourceManagerGateway));
-					}
-				}, executionContext);
-
-				registrationResponseFuture.onComplete(new OnComplete<Tuple2<RegistrationResponse, ResourceManagerGateway>>() {
-					@Override
-					public void onComplete(Throwable failure, Tuple2<RegistrationResponse, ResourceManagerGateway> tuple) throws Throwable {
-						if (failure != null) {
-							if (failure instanceof TimeoutException) {
-								// we haven't received an answer in the given timeout interval,
-								// so increase it and try again.
-								final FiniteDuration newTimeout = timeout.$times(2L).min(maxTimeout);
-
-								handleResourceManagerRegistration(
-									jobMasterRegistration,
-									attemptNumber + 1,
-									resourceManagerFuture,
-									registrationRun,
-									newTimeout,
-									maxTimeout,
-									deadline);
-							} else {
-								log.error("Received unknown error while registering at the ResourceManager.", failure);
-								shutDown();
-							}
-						} else {
-							final RegistrationResponse response = tuple._1();
-							final ResourceManagerGateway gateway = tuple._2();
-
-							if (response.isSuccess()) {
-								finishResourceManagerRegistration(gateway, response.getInstanceID());
-							} else {
-								log.info("The registration was refused. Try again.");
-
-								scheduledExecutorService.schedule(new Runnable() {
-									@Override
-									public void run() {
-										// we have to execute scheduled runnable in the main thread
-										// because we need consistency wrt currentRegistrationRun
-										runAsync(new Runnable() {
-											@Override
-											public void run() {
-												// our registration attempt was refused. Start over.
-												handleResourceManagerRegistration(
-													jobMasterRegistration,
-													1,
-													resourceManagerFuture,
-													registrationRun,
-													initialRegistrationTimeout,
-													maxTimeout,
-													deadline);
-											}
-										});
-									}
-								}, failedRegistrationDelay, TimeUnit.MILLISECONDS);
-							}
-						}
-					}
-				}, getMainThreadExecutionContext()); // use the main thread execution context to execute the call back in the main thread
-			}
-		} else {
-			log.info("Discard out-dated registration run.");
-		}
+	private void cancelAndClearEverything(Throwable cause) {
+		// currently, nothing to do here
 	}
 
-	/**
-	 * Finish the resource manager registration by setting the new resource manager gateway.
-	 *
-	 * @param resourceManager New resource manager gateway
-	 * @param instanceID Instance id assigned by the resource manager
-	 */
-	void finishResourceManagerRegistration(ResourceManagerGateway resourceManager, InstanceID instanceID) {
-		log.info("Successfully registered at the ResourceManager under instance id {}.", instanceID);
-		this.resourceManager = resourceManager;
-	}
+	// ------------------------------------------------------------------------
+	//  Utility classes
+	// ------------------------------------------------------------------------
+	private class JobMasterLeaderContender implements LeaderContender {
 
-	/**
-	 * Return if the job master is connected to a resource manager.
-	 *
-	 * @return true if the job master is connected to the resource manager
-	 */
-	public boolean isConnected() {
-		return resourceManager != null;
+		@Override
+		public void grantLeadership(UUID leaderSessionID) {
+			JobMaster.this.grantJobMasterLeadership(leaderSessionID);
+		}
+
+		@Override
+		public void revokeLeadership() {
+			JobMaster.this.revokeJobMasterLeadership();
+		}
+
+		@Override
+		public String getAddress() {
+			return JobMaster.this.getAddress();
+		}
+
+		@Override
+		public void handleError(Exception exception) {
+			onJobMasterElectionError(exception);
+		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb3d157/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
index 7b4ab89..2790cf8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
@@ -20,9 +20,12 @@ package org.apache.flink.runtime.rpc.akka;
 
 import akka.actor.ActorSystem;
 import akka.util.Timeout;
-
 import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.highavailability.NonHaServices;
+import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.rpc.jobmaster.JobMaster;
 import org.apache.flink.runtime.rpc.resourcemanager.ResourceManagerGateway;
 import org.apache.flink.runtime.rpc.resourcemanager.ResourceManager;
@@ -31,6 +34,7 @@ import org.apache.flink.util.TestLogger;
 import org.junit.AfterClass;
 import org.junit.Test;
 
+import org.mockito.Mockito;
 import scala.concurrent.duration.Deadline;
 import scala.concurrent.duration.FiniteDuration;
 
@@ -80,51 +84,4 @@ public class AkkaRpcServiceTest extends TestLogger {
 
 		assertTrue("call was not properly delayed", ((stop - start) / 1000000) >= delay);
 	}
-
-	// ------------------------------------------------------------------------
-	//  specific component tests - should be moved to the test classes
-	//  for those components
-	// ------------------------------------------------------------------------
-
-	/**
-	 * Tests that the {@link JobMaster} can connect to the {@link ResourceManager} using the
-	 * {@link AkkaRpcService}.
-	 */
-	@Test
-	public void testJobMasterResourceManagerRegistration() throws Exception {
-		Timeout akkaTimeout = new Timeout(10, TimeUnit.SECONDS);
-		ActorSystem actorSystem = AkkaUtils.createDefaultActorSystem();
-		ActorSystem actorSystem2 = AkkaUtils.createDefaultActorSystem();
-		AkkaRpcService akkaRpcService = new AkkaRpcService(actorSystem, akkaTimeout);
-		AkkaRpcService akkaRpcService2 = new AkkaRpcService(actorSystem2, akkaTimeout);
-		ExecutorService executorService = new ForkJoinPool();
-
-		ResourceManager resourceManager = new ResourceManager(akkaRpcService, executorService);
-		JobMaster jobMaster = new JobMaster(akkaRpcService2, executorService);
-
-		resourceManager.start();
-		jobMaster.start();
-
-		ResourceManagerGateway rm = resourceManager.getSelf();
-
-		assertTrue(rm instanceof AkkaGateway);
-
-		AkkaGateway akkaClient = (AkkaGateway) rm;
-
-		
-		jobMaster.registerAtResourceManager(AkkaUtils.getAkkaURL(actorSystem, akkaClient.getRpcEndpoint()));
-
-		// wait for successful registration
-		FiniteDuration timeout = new FiniteDuration(200, TimeUnit.SECONDS);
-		Deadline deadline = timeout.fromNow();
-
-		while (deadline.hasTimeLeft() && !jobMaster.isConnected()) {
-			Thread.sleep(100);
-		}
-
-		assertFalse(deadline.isOverdue());
-
-		jobMaster.shutDown();
-		resourceManager.shutDown();
-	}
 }