You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2018/08/03 14:38:29 UTC

[GitHub] asfgit closed pull request #6464: [FLINK-9936][mesos] Mesos resource manager unable to connect to master after failover

asfgit closed pull request #6464: [FLINK-9936][mesos] Mesos resource manager unable to connect to master after failover 
URL: https://github.com/apache/flink/pull/6464
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
index e24214d28c1..bc281348fa4 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
@@ -84,6 +84,7 @@
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
 import java.util.concurrent.TimeUnit;
 
 import scala.Option;
@@ -138,6 +139,8 @@
 	final Map<ResourceID, MesosWorkerStore.Worker> workersInLaunch;
 	final Map<ResourceID, MesosWorkerStore.Worker> workersBeingReturned;
 
+	private MesosConfiguration initializedMesosConfig;
+
 	public MesosResourceManager(
 			// base class
 			RpcService rpcService,
@@ -220,9 +223,6 @@ protected ActorRef createReconciliationCoordinator(SchedulerDriver schedulerDriv
 	//  Resource Manager overrides
 	// ------------------------------------------------------------------------
 
-	/**
-	 * Starts the Mesos-specifics.
-	 */
 	@Override
 	protected void initialize() throws ResourceManagerException {
 		// create and start the worker store
@@ -233,9 +233,7 @@ protected void initialize() throws ResourceManagerException {
 			throw new ResourceManagerException("Unable to initialize the worker store.", e);
 		}
 
-		// register with Mesos
-		// TODO : defer connection until RM acquires leadership
-
+		// Prepare to register with Mesos
 		Protos.FrameworkInfo.Builder frameworkInfo = mesosConfig.frameworkInfo()
 			.clone()
 			.setCheckpoint(true);
@@ -251,49 +249,86 @@ protected void initialize() throws ResourceManagerException {
 			throw new ResourceManagerException("Unable to recover the framework ID.", e);
 		}
 
-		MesosConfiguration initializedMesosConfig = mesosConfig.withFrameworkInfo(frameworkInfo);
+		initializedMesosConfig = mesosConfig.withFrameworkInfo(frameworkInfo);
 		MesosConfiguration.logMesosConfig(LOG, initializedMesosConfig);
+
+		this.selfActor = createSelfActor();
+
+		// configure the artifact server to serve the TM container artifacts
+		try {
+			LaunchableMesosWorker.configureArtifactServer(artifactServer, taskManagerContainerSpec);
+		}
+		catch (IOException e) {
+			throw new ResourceManagerException("Unable to configure the artifact server with TaskManager artifacts.", e);
+		}
+	}
+
+	@Override
+	protected CompletableFuture<Void> prepareLeadershipAsync() {
+		Preconditions.checkState(initializedMesosConfig != null);
+
 		schedulerDriver = initializedMesosConfig.createDriver(
 			new MesosResourceManagerSchedulerCallback(),
 			false);
 
 		// create supporting actors
-		selfActor = createSelfActor();
 		connectionMonitor = createConnectionMonitor();
 		launchCoordinator = createLaunchCoordinator(schedulerDriver, selfActor);
 		reconciliationCoordinator = createReconciliationCoordinator(schedulerDriver);
 		taskMonitor = createTaskMonitor(schedulerDriver);
 
-		// recover state
-		try {
-			recoverWorkers();
-		} catch (Exception e) {
-			throw new ResourceManagerException("Unable to recover Mesos worker state.", e);
-		}
+		return getWorkersAsync().thenApplyAsync((tasksFromPreviousAttempts) -> {
+			// recover state
+			recoverWorkers(tasksFromPreviousAttempts);
 
-		// configure the artifact server to serve the TM container artifacts
-		try {
-			LaunchableMesosWorker.configureArtifactServer(artifactServer, taskManagerContainerSpec);
-		}
-		catch (IOException e) {
-			throw new ResourceManagerException("Unable to configure the artifact server with TaskManager artifacts.", e);
-		}
+			// begin scheduling
+			connectionMonitor.tell(new ConnectionMonitor.Start(), selfActor);
+			schedulerDriver.start();
 
-		// begin scheduling
-		connectionMonitor.tell(new ConnectionMonitor.Start(), selfActor);
-		schedulerDriver.start();
+			LOG.info("Mesos resource manager started.");
+			return null;
+		}, getMainThreadExecutor());
+	}
+
+	@Override
+	protected CompletableFuture<Void> clearStateAsync() {
+		schedulerDriver.stop(true);
+
+		workersInNew.clear();
+		workersInLaunch.clear();
+		workersBeingReturned.clear();
 
-		LOG.info("Mesos resource manager initialized.");
+		return stopSupportingActorsAsync();
 	}
 
 	/**
-	 * Recover framework/worker information persisted by a prior incarnation of the RM.
+	 * Fetches framework/worker information persisted by a prior incarnation of the RM.
 	 */
-	private void recoverWorkers() throws Exception {
+	private CompletableFuture<List<MesosWorkerStore.Worker>> getWorkersAsync() {
 		// if this resource manager is recovering from failure,
 		// then some worker tasks are most likely still alive and we can re-obtain them
-		final List<MesosWorkerStore.Worker> tasksFromPreviousAttempts = workerStore.recoverWorkers();
+		return CompletableFuture.supplyAsync(() -> {
+			try {
+				final List<MesosWorkerStore.Worker> tasksFromPreviousAttempts = workerStore.recoverWorkers();
+				for (final MesosWorkerStore.Worker worker : tasksFromPreviousAttempts) {
+					if (worker.state() == MesosWorkerStore.WorkerState.New) {
+						// remove new workers because allocation requests are transient
+						workerStore.removeWorker(worker.taskID());
+					}
+				}
+				return tasksFromPreviousAttempts;
+			} catch (final Exception e) {
+				throw new CompletionException(new ResourceManagerException(e));
+			}
+		}, getRpcService().getExecutor());
+	}
 
+	/**
+	 * Recovers given framework/worker information.
+	 *
+	 * @see #getWorkersAsync()
+	 */
+	private void recoverWorkers(final List<MesosWorkerStore.Worker> tasksFromPreviousAttempts) {
 		assert(workersInNew.isEmpty());
 		assert(workersInLaunch.isEmpty());
 		assert(workersBeingReturned.isEmpty());
@@ -304,15 +339,10 @@ private void recoverWorkers() throws Exception {
 			List<Tuple2<TaskRequest, String>> toAssign = new ArrayList<>(tasksFromPreviousAttempts.size());
 
 			for (final MesosWorkerStore.Worker worker : tasksFromPreviousAttempts) {
-				LaunchableMesosWorker launchable = createLaunchableMesosWorker(worker.taskID(), worker.profile());
-
 				switch(worker.state()) {
-					case New:
-						// remove new workers because allocation requests are transient
-						workerStore.removeWorker(worker.taskID());
-						break;
 					case Launched:
 						workersInLaunch.put(extractResourceID(worker.taskID()), worker);
+						final LaunchableMesosWorker launchable = createLaunchableMesosWorker(worker.taskID(), worker.profile());
 						toAssign.add(new Tuple2<>(launchable.taskRequest(), worker.hostname().get()));
 						break;
 					case Released:
@@ -329,8 +359,7 @@ private void recoverWorkers() throws Exception {
 		}
 	}
 
-	@Override
-	public CompletableFuture<Void> postStop() {
+	private CompletableFuture<Void> stopSupportingActorsAsync() {
 		FiniteDuration stopTimeout = new FiniteDuration(5L, TimeUnit.SECONDS);
 
 		CompletableFuture<Boolean> stopTaskMonitorFuture = stopActor(taskMonitor, stopTimeout);
@@ -345,23 +374,23 @@ private void recoverWorkers() throws Exception {
 		CompletableFuture<Boolean> stopReconciliationCoordinatorFuture = stopActor(reconciliationCoordinator, stopTimeout);
 		reconciliationCoordinator = null;
 
-		CompletableFuture<Void> stopFuture = CompletableFuture.allOf(
+		return CompletableFuture.allOf(
 			stopTaskMonitorFuture,
 			stopConnectionMonitorFuture,
 			stopLaunchCoordinatorFuture,
 			stopReconciliationCoordinatorFuture);
+	}
 
-		final CompletableFuture<Void> terminationFuture = super.postStop();
-
-		return stopFuture.thenCombine(
-			terminationFuture,
-			(Void voidA, Void voidB) -> null);
+	@Override
+	public CompletableFuture<Void> postStop() {
+		return stopSupportingActorsAsync().thenCompose((ignored) -> super.postStop());
 	}
 
 	@Override
 	protected void internalDeregisterApplication(
 			ApplicationStatus finalStatus,
 			@Nullable String diagnostics) throws ResourceManagerException {
+
 		LOG.info("Shutting down and unregistering as a Mesos framework.");
 
 		Exception exception = null;
@@ -627,10 +656,15 @@ public void taskTerminated(TaskMonitor.TaskTerminated message) {
 	 * Tries to shut down the given actor gracefully.
 	 *
 	 * @param actorRef specifying the actor to shut down
-	 * @param timeout for the graceful shut down
-	 * @return Future containing the result of the graceful shut down
+	 * @param timeout  for the graceful shut down
+	 * @return A future that finishes with {@code true} iff. the actor could be stopped gracefully
+	 * or {@code actorRef} was {@code null}.
 	 */
-	private CompletableFuture<Boolean> stopActor(final ActorRef actorRef, FiniteDuration timeout) {
+	private CompletableFuture<Boolean> stopActor(@Nullable final ActorRef actorRef, FiniteDuration timeout) {
+		if (actorRef == null) {
+			return CompletableFuture.completedFuture(true);
+		}
+
 		return FutureUtils.toJava(Patterns.gracefulStop(actorRef, timeout))
 			.exceptionally(
 				(Throwable throwable) -> {
@@ -639,7 +673,7 @@ public void taskTerminated(TaskMonitor.TaskTerminated message) {
 
 					log.warn("Could not stop actor {} gracefully.", actorRef.path(), throwable);
 
-					return true;
+					return false;
 				}
 			);
 	}
@@ -794,7 +828,7 @@ public void run() {
 
 		@Override
 		public void disconnected(SchedulerDriver driver) {
-			runAsync(new Runnable() {
+			runAsyncWithoutFencing(new Runnable() {
 				@Override
 				public void run() {
 					MesosResourceManager.this.disconnected(new Disconnected());
diff --git a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java
index 5af3fa0a7d1..171e4087ddc 100644
--- a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java
+++ b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java
@@ -505,6 +505,7 @@ public void registerJobMaster(MockJobMaster jobMaster) throws Exception  {
 		@Override
 		public void close() throws Exception {
 			rpcService.stopService().get();
+			fatalErrorHandler.rethrowError();
 		}
 	}
 
@@ -807,4 +808,24 @@ public void testDisconnected() throws Exception {
 			resourceManager.taskRouter.expectMsgClass(Disconnected.class);
 		}};
 	}
+
+	@Test
+	public void testClearStateAfterRevokeLeadership() throws Exception {
+		new Context() {{
+			final MesosWorkerStore.Worker worker1 = MesosWorkerStore.Worker.newWorker(task1);
+			final MesosWorkerStore.Worker worker2 = MesosWorkerStore.Worker.newWorker(task2).launchWorker(slave1, slave1host);
+			final MesosWorkerStore.Worker worker3 = MesosWorkerStore.Worker.newWorker(task3).launchWorker(slave1, slave1host).releaseWorker();
+			when(rmServices.workerStore.getFrameworkID()).thenReturn(Option.apply(framework1));
+			when(rmServices.workerStore.recoverWorkers()).thenReturn(Arrays.asList(worker1, worker2, worker3)).thenReturn(Collections.emptyList());
+
+			startResourceManager();
+			rmServices.rmLeaderElectionService.notLeader();
+			rmServices.grantLeadership();
+
+			assertThat(resourceManager.workersInNew.size(), equalTo(0));
+			assertThat(resourceManager.workersInLaunch.size(), equalTo(0));
+			assertThat(resourceManager.workersBeingReturned.size(), equalTo(0));
+			verify(rmServices.schedulerDriver).stop(true);
+		}};
+	}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index a992632b666..7a54224e59b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -139,6 +139,14 @@
 	/** All registered listeners for status updates of the ResourceManager. */
 	private ConcurrentMap<String, InfoMessageListenerRpcGateway> infoMessageListeners;
 
+	/**
+	 * Represents asynchronous state clearing work.
+	 *
+	 * @see #clearStateAsync()
+	 * @see #clearStateInternal()
+	 */
+	private CompletableFuture<Void> clearStateFuture = CompletableFuture.completedFuture(null);
+
 	public ResourceManager(
 			RpcService rpcService,
 			String resourceManagerEndpointId,
@@ -192,6 +200,8 @@ public void start() throws Exception {
 
 		leaderElectionService = highAvailabilityServices.getResourceManagerLeaderElectionService();
 
+		initialize();
+
 		try {
 			leaderElectionService.start(this);
 		} catch (Exception e) {
@@ -203,8 +213,6 @@ public void start() throws Exception {
 		} catch (Exception e) {
 			throw new ResourceManagerException("Could not start the job leader id service.", e);
 		}
-
-		initialize();
 	}
 
 	@Override
@@ -233,7 +241,7 @@ public void start() throws Exception {
 			exception = ExceptionUtils.firstOrSuppressed(e, exception);
 		}
 
-		clearState();
+		clearStateInternal();
 
 		if (exception != null) {
 			return FutureUtils.completedExceptionally(
@@ -724,7 +732,7 @@ public void requestHeartbeat(ResourceID resourceID, Void payload) {
 		}
 	}
 
-	private void clearState() {
+	private void clearStateInternal() {
 		jobManagerRegistrations.clear();
 		jmResourceIdRegistrations.clear();
 		taskExecutors.clear();
@@ -734,6 +742,7 @@ private void clearState() {
 		} catch (Exception e) {
 			onFatalError(new ResourceManagerException("Could not properly clear the job leader id service.", e));
 		}
+		clearStateFuture = clearStateAsync();
 	}
 
 	/**
@@ -886,26 +895,45 @@ protected void onFatalError(Throwable t) {
 	 */
 	@Override
 	public void grantLeadership(final UUID newLeaderSessionID) {
-		runAsyncWithoutFencing(
-			() -> {
-				final ResourceManagerId newResourceManagerId = ResourceManagerId.fromUuid(newLeaderSessionID);
-
-				log.info("ResourceManager {} was granted leadership with fencing token {}", getAddress(), newResourceManagerId);
+		final CompletableFuture<Boolean> acceptLeadershipFuture = clearStateFuture
+			.thenComposeAsync((ignored) -> tryAcceptLeadership(newLeaderSessionID), getUnfencedMainThreadExecutor());
+
+		final CompletableFuture<Void> confirmationFuture = acceptLeadershipFuture.thenAcceptAsync(
+			(acceptLeadership) -> {
+				if (acceptLeadership) {
+					// confirming the leader session ID might be blocking,
+					leaderElectionService.confirmLeaderSessionID(newLeaderSessionID);
+				}
+			},
+			getRpcService().getExecutor());
 
-				// clear the state if we've been the leader before
-				if (getFencingToken() != null) {
-					clearState();
+		confirmationFuture.whenComplete(
+			(Void ignored, Throwable throwable) -> {
+				if (throwable != null) {
+					onFatalError(ExceptionUtils.stripCompletionException(throwable));
 				}
+			});
+	}
 
-				setFencingToken(newResourceManagerId);
+	private CompletableFuture<Boolean> tryAcceptLeadership(final UUID newLeaderSessionID) {
+		if (leaderElectionService.hasLeadership(newLeaderSessionID)) {
+			final ResourceManagerId newResourceManagerId = ResourceManagerId.fromUuid(newLeaderSessionID);
 
-				slotManager.start(getFencingToken(), getMainThreadExecutor(), new ResourceActionsImpl());
+			log.info("ResourceManager {} was granted leadership with fencing token {}", getAddress(), newResourceManagerId);
 
-				getRpcService().execute(
-					() ->
-						// confirming the leader session ID might be blocking,
-						leaderElectionService.confirmLeaderSessionID(newLeaderSessionID));
-			});
+			// clear the state if we've been the leader before
+			if (getFencingToken() != null) {
+				clearStateInternal();
+			}
+
+			setFencingToken(newResourceManagerId);
+
+			slotManager.start(getFencingToken(), getMainThreadExecutor(), new ResourceActionsImpl());
+
+			return prepareLeadershipAsync().thenApply(ignored -> true);
+		} else {
+			return CompletableFuture.completedFuture(false);
+		}
 	}
 
 	/**
@@ -917,7 +945,7 @@ public void revokeLeadership() {
 			() -> {
 				log.info("ResourceManager {} was revoked leadership. Clearing fencing token.", getAddress());
 
-				clearState();
+				clearStateInternal();
 
 				setFencingToken(null);
 
@@ -946,6 +974,28 @@ public void handleError(final Exception exception) {
 	 */
 	protected abstract void initialize() throws ResourceManagerException;
 
+	/**
+	 * This method can be overridden to add a (non-blocking) initialization routine to the
+	 * ResourceManager that will be called when leadership is granted but before leadership is
+	 * confirmed.
+	 *
+	 * @return Returns a {@code CompletableFuture} that completes when the computation is finished.
+	 */
+	protected CompletableFuture<Void> prepareLeadershipAsync() {
+		return CompletableFuture.completedFuture(null);
+	}
+
+	/**
+	 * This method can be overridden to add a (non-blocking) state clearing routine to the
+	 * ResourceManager that will be called when leadership is revoked.
+	 *
+	 * @return Returns a {@code CompletableFuture} that completes when the state clearing routine
+	 * is finished.
+	 */
+	protected CompletableFuture<Void> clearStateAsync() {
+		return CompletableFuture.completedFuture(null);
+	}
+
 	/**
 	 * The framework specific code to deregister the application. This should report the
 	 * application's final status and shut down the resource manager cleanly.
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java
index a8837846311..a1f6227be02 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.runtime.resourcemanager;
 
-import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
@@ -67,7 +66,6 @@ public void tearDown() throws ExecutionException, InterruptedException {
 	 */
 	@Test
 	public void testRequestTaskManagerInfo() throws Exception {
-		final Configuration configuration = new Configuration();
 		final TestingHighAvailabilityServices highAvailabilityServices = new TestingHighAvailabilityServices();
 		final SlotManager slotManager = new SlotManager(
 			rpcService.getScheduledExecutor(),


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services