You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/08/03 14:36:57 UTC

[flink] 06/06: [FLINK-9936][mesos] Wait for leadership before creating MesosResourceManager components

This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 5b1500d4fe39a44bfaa03b27216627e3caf55240
Author: gyao <ga...@data-artisans.com>
AuthorDate: Mon Jul 30 10:57:40 2018 +0800

    [FLINK-9936][mesos] Wait for leadership before creating MesosResourceManager components
    
    This closes #6464.
---
 .../clusterframework/MesosResourceManager.java     | 130 +++++++++++++--------
 .../clusterframework/MesosResourceManagerTest.java |  21 ++++
 .../runtime/resourcemanager/ResourceManager.java   |  90 ++++++++++----
 3 files changed, 173 insertions(+), 68 deletions(-)

diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
index e24214d..6fc5322 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
@@ -84,6 +84,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
 import java.util.concurrent.TimeUnit;
 
 import scala.Option;
@@ -138,6 +139,8 @@ public class MesosResourceManager extends ResourceManager<RegisteredMesosWorkerN
 	final Map<ResourceID, MesosWorkerStore.Worker> workersInLaunch;
 	final Map<ResourceID, MesosWorkerStore.Worker> workersBeingReturned;
 
+	private MesosConfiguration initializedMesosConfig;
+
 	public MesosResourceManager(
 			// base class
 			RpcService rpcService,
@@ -220,9 +223,6 @@ public class MesosResourceManager extends ResourceManager<RegisteredMesosWorkerN
 	//  Resource Manager overrides
 	// ------------------------------------------------------------------------
 
-	/**
-	 * Starts the Mesos-specifics.
-	 */
 	@Override
 	protected void initialize() throws ResourceManagerException {
 		// create and start the worker store
@@ -233,9 +233,7 @@ public class MesosResourceManager extends ResourceManager<RegisteredMesosWorkerN
 			throw new ResourceManagerException("Unable to initialize the worker store.", e);
 		}
 
-		// register with Mesos
-		// TODO : defer connection until RM acquires leadership
-
+		// Prepare to register with Mesos
 		Protos.FrameworkInfo.Builder frameworkInfo = mesosConfig.frameworkInfo()
 			.clone()
 			.setCheckpoint(true);
@@ -251,49 +249,86 @@ public class MesosResourceManager extends ResourceManager<RegisteredMesosWorkerN
 			throw new ResourceManagerException("Unable to recover the framework ID.", e);
 		}
 
-		MesosConfiguration initializedMesosConfig = mesosConfig.withFrameworkInfo(frameworkInfo);
+		initializedMesosConfig = mesosConfig.withFrameworkInfo(frameworkInfo);
 		MesosConfiguration.logMesosConfig(LOG, initializedMesosConfig);
+
+		this.selfActor = createSelfActor();
+
+		// configure the artifact server to serve the TM container artifacts
+		try {
+			LaunchableMesosWorker.configureArtifactServer(artifactServer, taskManagerContainerSpec);
+		}
+		catch (IOException e) {
+			throw new ResourceManagerException("Unable to configure the artifact server with TaskManager artifacts.", e);
+		}
+	}
+
+	@Override
+	protected CompletableFuture<Void> prepareLeadershipAsync() {
+		Preconditions.checkState(initializedMesosConfig != null);
+
 		schedulerDriver = initializedMesosConfig.createDriver(
 			new MesosResourceManagerSchedulerCallback(),
 			false);
 
 		// create supporting actors
-		selfActor = createSelfActor();
 		connectionMonitor = createConnectionMonitor();
 		launchCoordinator = createLaunchCoordinator(schedulerDriver, selfActor);
 		reconciliationCoordinator = createReconciliationCoordinator(schedulerDriver);
 		taskMonitor = createTaskMonitor(schedulerDriver);
 
-		// recover state
-		try {
-			recoverWorkers();
-		} catch (Exception e) {
-			throw new ResourceManagerException("Unable to recover Mesos worker state.", e);
-		}
+		return getWorkersAsync().thenApplyAsync((tasksFromPreviousAttempts) -> {
+			// recover state
+			recoverWorkers(tasksFromPreviousAttempts);
 
-		// configure the artifact server to serve the TM container artifacts
-		try {
-			LaunchableMesosWorker.configureArtifactServer(artifactServer, taskManagerContainerSpec);
-		}
-		catch (IOException e) {
-			throw new ResourceManagerException("Unable to configure the artifact server with TaskManager artifacts.", e);
-		}
+			// begin scheduling
+			connectionMonitor.tell(new ConnectionMonitor.Start(), selfActor);
+			schedulerDriver.start();
 
-		// begin scheduling
-		connectionMonitor.tell(new ConnectionMonitor.Start(), selfActor);
-		schedulerDriver.start();
+			LOG.info("Mesos resource manager started.");
+			return null;
+		}, getMainThreadExecutor());
+	}
+
+	@Override
+	protected CompletableFuture<Void> clearStateAsync() {
+		schedulerDriver.stop(true);
+
+		workersInNew.clear();
+		workersInLaunch.clear();
+		workersBeingReturned.clear();
 
-		LOG.info("Mesos resource manager initialized.");
+		return stopSupportingActorsAsync();
 	}
 
 	/**
-	 * Recover framework/worker information persisted by a prior incarnation of the RM.
+	 * Fetches framework/worker information persisted by a prior incarnation of the RM.
 	 */
-	private void recoverWorkers() throws Exception {
+	private CompletableFuture<List<MesosWorkerStore.Worker>> getWorkersAsync() {
 		// if this resource manager is recovering from failure,
 		// then some worker tasks are most likely still alive and we can re-obtain them
-		final List<MesosWorkerStore.Worker> tasksFromPreviousAttempts = workerStore.recoverWorkers();
+		return CompletableFuture.supplyAsync(() -> {
+			try {
+				final List<MesosWorkerStore.Worker> tasksFromPreviousAttempts = workerStore.recoverWorkers();
+				for (final MesosWorkerStore.Worker worker : tasksFromPreviousAttempts) {
+					if (worker.state() == MesosWorkerStore.WorkerState.New) {
+						// remove new workers because allocation requests are transient
+						workerStore.removeWorker(worker.taskID());
+					}
+				}
+				return tasksFromPreviousAttempts;
+			} catch (final Exception e) {
+				throw new CompletionException(new ResourceManagerException(e));
+			}
+		}, getRpcService().getExecutor());
+	}
 
+	/**
+	 * Recovers given framework/worker information.
+	 *
+	 * @see #getWorkersAsync()
+	 */
+	private void recoverWorkers(final List<MesosWorkerStore.Worker> tasksFromPreviousAttempts) {
 		assert(workersInNew.isEmpty());
 		assert(workersInLaunch.isEmpty());
 		assert(workersBeingReturned.isEmpty());
@@ -304,15 +339,10 @@ public class MesosResourceManager extends ResourceManager<RegisteredMesosWorkerN
 			List<Tuple2<TaskRequest, String>> toAssign = new ArrayList<>(tasksFromPreviousAttempts.size());
 
 			for (final MesosWorkerStore.Worker worker : tasksFromPreviousAttempts) {
-				LaunchableMesosWorker launchable = createLaunchableMesosWorker(worker.taskID(), worker.profile());
-
 				switch(worker.state()) {
-					case New:
-						// remove new workers because allocation requests are transient
-						workerStore.removeWorker(worker.taskID());
-						break;
 					case Launched:
 						workersInLaunch.put(extractResourceID(worker.taskID()), worker);
+						final LaunchableMesosWorker launchable = createLaunchableMesosWorker(worker.taskID(), worker.profile());
 						toAssign.add(new Tuple2<>(launchable.taskRequest(), worker.hostname().get()));
 						break;
 					case Released:
@@ -329,8 +359,7 @@ public class MesosResourceManager extends ResourceManager<RegisteredMesosWorkerN
 		}
 	}
 
-	@Override
-	public CompletableFuture<Void> postStop() {
+	private CompletableFuture<Void> stopSupportingActorsAsync() {
 		FiniteDuration stopTimeout = new FiniteDuration(5L, TimeUnit.SECONDS);
 
 		CompletableFuture<Boolean> stopTaskMonitorFuture = stopActor(taskMonitor, stopTimeout);
@@ -345,23 +374,23 @@ public class MesosResourceManager extends ResourceManager<RegisteredMesosWorkerN
 		CompletableFuture<Boolean> stopReconciliationCoordinatorFuture = stopActor(reconciliationCoordinator, stopTimeout);
 		reconciliationCoordinator = null;
 
-		CompletableFuture<Void> stopFuture = CompletableFuture.allOf(
+		return CompletableFuture.allOf(
 			stopTaskMonitorFuture,
 			stopConnectionMonitorFuture,
 			stopLaunchCoordinatorFuture,
 			stopReconciliationCoordinatorFuture);
+	}
 
-		final CompletableFuture<Void> terminationFuture = super.postStop();
-
-		return stopFuture.thenCombine(
-			terminationFuture,
-			(Void voidA, Void voidB) -> null);
+	@Override
+	public CompletableFuture<Void> postStop() {
+		return stopSupportingActorsAsync().thenCompose((ignored) -> super.postStop());
 	}
 
 	@Override
 	protected void internalDeregisterApplication(
 			ApplicationStatus finalStatus,
 			@Nullable String diagnostics) throws ResourceManagerException {
+
 		LOG.info("Shutting down and unregistering as a Mesos framework.");
 
 		Exception exception = null;
@@ -627,11 +656,16 @@ public class MesosResourceManager extends ResourceManager<RegisteredMesosWorkerN
 	 * Tries to shut down the given actor gracefully.
 	 *
 	 * @param actorRef specifying the actor to shut down
-	 * @param timeout for the graceful shut down
-	 * @return Future containing the result of the graceful shut down
+	 * @param timeout  for the graceful shut down
+	 * @return A future that finishes with {@code true} iff. the actor could be stopped gracefully
+	 * or {@code actorRef} was {@code null}.
 	 */
-	private CompletableFuture<Boolean> stopActor(final ActorRef actorRef, FiniteDuration timeout) {
-		return FutureUtils.toJava(Patterns.gracefulStop(actorRef, timeout))
+	private CompletableFuture<Boolean> stopActor(@Nullable final ActorRef actorRef, FiniteDuration timeout) {
+		if (actorRef == null) {
+			return CompletableFuture.completedFuture(true);
+		}
+
+		return FutureUtils.<Boolean>toJava(Patterns.gracefulStop(actorRef, timeout))
 			.exceptionally(
 				(Throwable throwable) -> {
 					// The actor did not stop gracefully in time, try to directly stop it
@@ -639,7 +673,7 @@ public class MesosResourceManager extends ResourceManager<RegisteredMesosWorkerN
 
 					log.warn("Could not stop actor {} gracefully.", actorRef.path(), throwable);
 
-					return true;
+					return false;
 				}
 			);
 	}
@@ -794,7 +828,7 @@ public class MesosResourceManager extends ResourceManager<RegisteredMesosWorkerN
 
 		@Override
 		public void disconnected(SchedulerDriver driver) {
-			runAsync(new Runnable() {
+			runAsyncWithoutFencing(new Runnable() {
 				@Override
 				public void run() {
 					MesosResourceManager.this.disconnected(new Disconnected());
diff --git a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java
index 5af3fa0..171e408 100644
--- a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java
+++ b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java
@@ -505,6 +505,7 @@ public class MesosResourceManagerTest extends TestLogger {
 		@Override
 		public void close() throws Exception {
 			rpcService.stopService().get();
+			fatalErrorHandler.rethrowError();
 		}
 	}
 
@@ -807,4 +808,24 @@ public class MesosResourceManagerTest extends TestLogger {
 			resourceManager.taskRouter.expectMsgClass(Disconnected.class);
 		}};
 	}
+
+	@Test
+	public void testClearStateAfterRevokeLeadership() throws Exception {
+		new Context() {{
+			final MesosWorkerStore.Worker worker1 = MesosWorkerStore.Worker.newWorker(task1);
+			final MesosWorkerStore.Worker worker2 = MesosWorkerStore.Worker.newWorker(task2).launchWorker(slave1, slave1host);
+			final MesosWorkerStore.Worker worker3 = MesosWorkerStore.Worker.newWorker(task3).launchWorker(slave1, slave1host).releaseWorker();
+			when(rmServices.workerStore.getFrameworkID()).thenReturn(Option.apply(framework1));
+			when(rmServices.workerStore.recoverWorkers()).thenReturn(Arrays.asList(worker1, worker2, worker3)).thenReturn(Collections.emptyList());
+
+			startResourceManager();
+			rmServices.rmLeaderElectionService.notLeader();
+			rmServices.grantLeadership();
+
+			assertThat(resourceManager.workersInNew.size(), equalTo(0));
+			assertThat(resourceManager.workersInLaunch.size(), equalTo(0));
+			assertThat(resourceManager.workersBeingReturned.size(), equalTo(0));
+			verify(rmServices.schedulerDriver).stop(true);
+		}};
+	}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index a992632..7a54224 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -139,6 +139,14 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
 	/** All registered listeners for status updates of the ResourceManager. */
 	private ConcurrentMap<String, InfoMessageListenerRpcGateway> infoMessageListeners;
 
+	/**
+	 * Represents asynchronous state clearing work.
+	 *
+	 * @see #clearStateAsync()
+	 * @see #clearStateInternal()
+	 */
+	private CompletableFuture<Void> clearStateFuture = CompletableFuture.completedFuture(null);
+
 	public ResourceManager(
 			RpcService rpcService,
 			String resourceManagerEndpointId,
@@ -192,6 +200,8 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
 
 		leaderElectionService = highAvailabilityServices.getResourceManagerLeaderElectionService();
 
+		initialize();
+
 		try {
 			leaderElectionService.start(this);
 		} catch (Exception e) {
@@ -203,8 +213,6 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
 		} catch (Exception e) {
 			throw new ResourceManagerException("Could not start the job leader id service.", e);
 		}
-
-		initialize();
 	}
 
 	@Override
@@ -233,7 +241,7 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
 			exception = ExceptionUtils.firstOrSuppressed(e, exception);
 		}
 
-		clearState();
+		clearStateInternal();
 
 		if (exception != null) {
 			return FutureUtils.completedExceptionally(
@@ -724,7 +732,7 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
 		}
 	}
 
-	private void clearState() {
+	private void clearStateInternal() {
 		jobManagerRegistrations.clear();
 		jmResourceIdRegistrations.clear();
 		taskExecutors.clear();
@@ -734,6 +742,7 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
 		} catch (Exception e) {
 			onFatalError(new ResourceManagerException("Could not properly clear the job leader id service.", e));
 		}
+		clearStateFuture = clearStateAsync();
 	}
 
 	/**
@@ -886,26 +895,45 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
 	 */
 	@Override
 	public void grantLeadership(final UUID newLeaderSessionID) {
-		runAsyncWithoutFencing(
-			() -> {
-				final ResourceManagerId newResourceManagerId = ResourceManagerId.fromUuid(newLeaderSessionID);
-
-				log.info("ResourceManager {} was granted leadership with fencing token {}", getAddress(), newResourceManagerId);
+		final CompletableFuture<Boolean> acceptLeadershipFuture = clearStateFuture
+			.thenComposeAsync((ignored) -> tryAcceptLeadership(newLeaderSessionID), getUnfencedMainThreadExecutor());
+
+		final CompletableFuture<Void> confirmationFuture = acceptLeadershipFuture.thenAcceptAsync(
+			(acceptLeadership) -> {
+				if (acceptLeadership) {
+					// confirming the leader session ID might be blocking,
+					leaderElectionService.confirmLeaderSessionID(newLeaderSessionID);
+				}
+			},
+			getRpcService().getExecutor());
 
-				// clear the state if we've been the leader before
-				if (getFencingToken() != null) {
-					clearState();
+		confirmationFuture.whenComplete(
+			(Void ignored, Throwable throwable) -> {
+				if (throwable != null) {
+					onFatalError(ExceptionUtils.stripCompletionException(throwable));
 				}
+			});
+	}
 
-				setFencingToken(newResourceManagerId);
+	private CompletableFuture<Boolean> tryAcceptLeadership(final UUID newLeaderSessionID) {
+		if (leaderElectionService.hasLeadership(newLeaderSessionID)) {
+			final ResourceManagerId newResourceManagerId = ResourceManagerId.fromUuid(newLeaderSessionID);
 
-				slotManager.start(getFencingToken(), getMainThreadExecutor(), new ResourceActionsImpl());
+			log.info("ResourceManager {} was granted leadership with fencing token {}", getAddress(), newResourceManagerId);
 
-				getRpcService().execute(
-					() ->
-						// confirming the leader session ID might be blocking,
-						leaderElectionService.confirmLeaderSessionID(newLeaderSessionID));
-			});
+			// clear the state if we've been the leader before
+			if (getFencingToken() != null) {
+				clearStateInternal();
+			}
+
+			setFencingToken(newResourceManagerId);
+
+			slotManager.start(getFencingToken(), getMainThreadExecutor(), new ResourceActionsImpl());
+
+			return prepareLeadershipAsync().thenApply(ignored -> true);
+		} else {
+			return CompletableFuture.completedFuture(false);
+		}
 	}
 
 	/**
@@ -917,7 +945,7 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
 			() -> {
 				log.info("ResourceManager {} was revoked leadership. Clearing fencing token.", getAddress());
 
-				clearState();
+				clearStateInternal();
 
 				setFencingToken(null);
 
@@ -947,6 +975,28 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
 	protected abstract void initialize() throws ResourceManagerException;
 
 	/**
+	 * This method can be overridden to add a (non-blocking) initialization routine to the
+	 * ResourceManager that will be called when leadership is granted but before leadership is
+	 * confirmed.
+	 *
+	 * @return Returns a {@code CompletableFuture} that completes when the computation is finished.
+	 */
+	protected CompletableFuture<Void> prepareLeadershipAsync() {
+		return CompletableFuture.completedFuture(null);
+	}
+
+	/**
+	 * This method can be overridden to add a (non-blocking) state clearing routine to the
+	 * ResourceManager that will be called when leadership is revoked.
+	 *
+	 * @return Returns a {@code CompletableFuture} that completes when the state clearing routine
+	 * is finished.
+	 */
+	protected CompletableFuture<Void> clearStateAsync() {
+		return CompletableFuture.completedFuture(null);
+	}
+
+	/**
 	 * The framework specific code to deregister the application. This should report the
 	 * application's final status and shut down the resource manager cleanly.
 	 *