You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/10/06 21:31:53 UTC

[GitHub] [flink] tillrohrmann commented on a change in pull request #13313: [FLINK-18722][mesos] Migrate MesosResourceManager to the new MesosResourceManagerDriver

tillrohrmann commented on a change in pull request #13313:
URL: https://github.com/apache/flink/pull/13313#discussion_r500423676



##########
File path: flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
##########
@@ -882,7 +820,7 @@ public void run() {
 	/**
 	 * Adapts incoming Akka messages as RPC calls to the resource manager.
 	 */
-	private class AkkaAdapter extends UntypedAbstractActor {
+	public class AkkaAdapter extends UntypedAbstractActor {

Review comment:
       I'd suggest to make this class package private and to move MesosResourceManagerActorFactory into the package of the `MesosResourceManager`.

##########
File path: flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerDriver.java
##########
@@ -0,0 +1,713 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.runtime.clusterframework;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.mesos.runtime.clusterframework.actors.MesosResourceManagerActorFactory;
+import org.apache.flink.mesos.runtime.clusterframework.services.MesosServices;
+import org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore;
+import org.apache.flink.mesos.scheduler.ConnectionMonitor;
+import org.apache.flink.mesos.scheduler.LaunchCoordinator;
+import org.apache.flink.mesos.scheduler.ReconciliationCoordinator;
+import org.apache.flink.mesos.scheduler.TaskMonitor;
+import org.apache.flink.mesos.scheduler.TaskSchedulerBuilder;
+import org.apache.flink.mesos.scheduler.messages.AcceptOffers;
+import org.apache.flink.mesos.scheduler.messages.Disconnected;
+import org.apache.flink.mesos.scheduler.messages.OfferRescinded;
+import org.apache.flink.mesos.scheduler.messages.ReRegistered;
+import org.apache.flink.mesos.scheduler.messages.Registered;
+import org.apache.flink.mesos.scheduler.messages.ResourceOffers;
+import org.apache.flink.mesos.scheduler.messages.StatusUpdate;
+import org.apache.flink.mesos.util.MesosArtifactServer;
+import org.apache.flink.mesos.util.MesosConfiguration;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.ContainerSpecification;
+import org.apache.flink.runtime.clusterframework.TaskExecutorProcessSpec;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.resourcemanager.WorkerResourceSpec;
+import org.apache.flink.runtime.resourcemanager.active.AbstractResourceManagerDriver;
+import org.apache.flink.runtime.resourcemanager.active.ResourceManagerDriver;
+import org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.Preconditions;
+
+import akka.actor.ActorRef;
+import akka.actor.UntypedAbstractActor;
+import com.netflix.fenzo.TaskRequest;
+import com.netflix.fenzo.TaskScheduler;
+import com.netflix.fenzo.VirtualMachineLease;
+import com.netflix.fenzo.functions.Action1;
+import org.apache.mesos.Protos;
+import org.apache.mesos.Scheduler;
+import org.apache.mesos.SchedulerDriver;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import scala.Option;
+import scala.concurrent.duration.FiniteDuration;
+
+/**
+ * Implementation of {@link ResourceManagerDriver} for Mesos deployment.
+ */
+public class MesosResourceManagerDriver extends AbstractResourceManagerDriver<RegisteredMesosWorkerNode> {
+
+	/** The Mesos configuration (master and framework info). */
+	private final MesosConfiguration mesosConfig;
+
+	/** The Mesos services needed by the resource manager. */
+	private final MesosServices mesosServices;
+
+	/** The TaskManager container parameters (like container memory size). */
+	private final MesosTaskManagerParameters taskManagerParameters;
+
+	/** Container specification for launching a TM. */
+	private final ContainerSpecification taskManagerContainerSpec;
+
+	/** Server for HTTP artifacts. */
+	private final MesosArtifactServer artifactServer;
+
+	/** Persistent storage of allocated containers. */
+	private MesosWorkerStore workerStore;
+
+	/** Factory for creating local actors. */
+	private final MesosResourceManagerActorFactory actorFactory;
+
+	/** Web url to show in mesos page. */
+	@Nullable
+	private final String webUiUrl;
+
+	/** Mesos scheduler driver. */
+	private SchedulerDriver schedulerDriver;
+
+	/** an adapter to receive messages from Akka actors. */
+	private ActorRef selfActor;
+
+	private ActorRef connectionMonitor;
+
+	private ActorRef taskMonitor;
+
+	private ActorRef launchCoordinator;
+
+	private ActorRef reconciliationCoordinator;
+
+	/** Workers that are requested but not yet offered. */
+	private final Map<ResourceID, MesosWorkerStore.Worker> workersInNew;
+	private final Map<ResourceID, CompletableFuture<RegisteredMesosWorkerNode>> requestResourceFutures;
+
+	private MesosConfiguration initializedMesosConfig;
+
+	public MesosResourceManagerDriver(
+			Configuration flinkConfig,
+			MesosServices mesosServices,
+			MesosConfiguration mesosConfig,
+			MesosTaskManagerParameters taskManagerParameters,
+			ContainerSpecification taskManagerContainerSpec,
+			@Nullable String webUiUrl) {
+		super(flinkConfig, GlobalConfiguration.loadConfiguration());
+
+		this.mesosServices = Preconditions.checkNotNull(mesosServices);
+		this.actorFactory = Preconditions.checkNotNull(mesosServices.createMesosResourceManagerActorFactory());
+
+		this.mesosConfig = Preconditions.checkNotNull(mesosConfig);
+
+		this.artifactServer = Preconditions.checkNotNull(mesosServices.getArtifactServer());
+
+		this.taskManagerParameters = Preconditions.checkNotNull(taskManagerParameters);
+		this.taskManagerContainerSpec = Preconditions.checkNotNull(taskManagerContainerSpec);
+		this.webUiUrl = webUiUrl;
+
+		this.workersInNew = new HashMap<>();
+		this.requestResourceFutures = new HashMap<>();
+	}
+
+	// ------------------------------------------------------------------------
+	//  ResourceManagerDriver
+	// ------------------------------------------------------------------------
+
+	@Override
+	protected void initializeInternal() throws Exception {
+		// create and start the worker store
+		try {
+			this.workerStore = mesosServices.createMesosWorkerStore(flinkConfig);
+			workerStore.start();
+		} catch (Exception e) {
+			throw new ResourceManagerException("Unable to initialize the worker store.", e);
+		}
+
+		// Prepare to register with Mesos
+		Protos.FrameworkInfo.Builder frameworkInfo = mesosConfig.frameworkInfo()
+				.clone()
+				.setCheckpoint(true);
+		if (webUiUrl != null) {
+			frameworkInfo.setWebuiUrl(webUiUrl);
+		}
+
+		try {
+			Option<Protos.FrameworkID> frameworkID = workerStore.getFrameworkID();
+			if (frameworkID.isEmpty()) {
+				log.info("Registering as new framework.");
+			} else {
+				log.info("Recovery scenario: re-registering using framework ID {}.", frameworkID.get().getValue());
+				frameworkInfo.setId(frameworkID.get());
+			}
+		} catch (Exception e) {
+			throw new ResourceManagerException("Unable to recover the framework ID.", e);
+		}
+
+		initializedMesosConfig = mesosConfig.withFrameworkInfo(frameworkInfo);
+		MesosConfiguration.logMesosConfig(log, initializedMesosConfig);
+
+		this.selfActor = actorFactory.createSelfActorForMesosResourceManagerDriver(this);
+
+		// configure the artifact server to serve the TM container artifacts
+		try {
+			LaunchableMesosWorker.configureArtifactServer(artifactServer, taskManagerContainerSpec);
+		}
+		catch (IOException e) {
+			throw new ResourceManagerException("Unable to configure the artifact server with TaskManager artifacts.", e);
+		}
+	}
+
+	@Override
+	public CompletableFuture<Void> terminate() {
+		return stopSupportingActorsAsync();
+	}
+
+	@Override
+	public CompletableFuture<Void> onGrantLeadership() {
+		Preconditions.checkState(initializedMesosConfig != null);
+
+		schedulerDriver = mesosServices.createMesosSchedulerDriver(
+				initializedMesosConfig,
+				new MesosResourceManagerSchedulerCallback(),
+				false);
+
+		// create supporting actors
+		connectionMonitor = actorFactory.createConnectionMonitor(flinkConfig);
+		launchCoordinator = actorFactory.createLaunchCoordinator(flinkConfig, selfActor, schedulerDriver, createOptimizer());
+		reconciliationCoordinator = actorFactory.createReconciliationCoordinator(flinkConfig, schedulerDriver);
+		taskMonitor = actorFactory.createTaskMonitor(flinkConfig, selfActor, schedulerDriver);
+
+		return getWorkersAsync().thenApplyAsync((tasksFromPreviousAttempts) -> {
+			// recover state
+			recoverWorkers(tasksFromPreviousAttempts);
+
+			// begin scheduling
+			connectionMonitor.tell(new ConnectionMonitor.Start(), selfActor);
+			schedulerDriver.start();
+
+			log.info("Mesos resource manager started.");
+			return null;
+		}, getMainThreadExecutor());
+	}
+
+	@Override
+	public CompletableFuture<Void> onRevokeLeadership() {
+		schedulerDriver.stop(true);
+
+		workersInNew.clear();
+		requestResourceFutures.clear();
+
+		return stopSupportingActorsAsync();
+	}
+
+	@Override
+	public void deregisterApplication(ApplicationStatus finalStatus, @Nullable String optionalDiagnostics) throws Exception {
+		log.info("Shutting down and unregistering as a Mesos framework.");
+
+		Exception exception = null;
+
+		try {
+			// unregister the framework, which implicitly removes all tasks.
+			schedulerDriver.stop(false);
+		} catch (Exception ex) {
+			exception = new Exception("Could not unregister the Mesos framework.", ex);
+		}
+
+		try {
+			workerStore.stop(true);
+		} catch (Exception ex) {
+			exception = ExceptionUtils.firstOrSuppressed(
+					new Exception("Could not stop the Mesos worker store.", ex),
+					exception);
+		}
+
+		if (exception != null) {
+			throw new ResourceManagerException("Could not properly shut down the Mesos application.", exception);
+		}
+	}
+
+	@Override
+	public CompletableFuture<RegisteredMesosWorkerNode> requestResource(TaskExecutorProcessSpec taskExecutorProcessSpec) {
+		Preconditions.checkArgument(Objects.equals(
+				taskExecutorProcessSpec,
+				taskManagerParameters.containeredParameters().getTaskExecutorProcessSpec()));
+		log.info("Starting a new worker.");
+
+		try {
+			// generate new workers into persistent state and launch associated actors
+			// TODO: arbitrary WorkerResourceSpec used here, which should be removed after removing MesosResourceManager.
+			MesosWorkerStore.Worker worker = MesosWorkerStore.Worker.newWorker(workerStore.newTaskID(), WorkerResourceSpec.ZERO);
+			workerStore.putWorker(worker);
+
+			final ResourceID resourceId = extractResourceID(worker.taskID());
+			workersInNew.put(resourceId, worker);
+
+			final CompletableFuture<RegisteredMesosWorkerNode> requestResourceFuture = new CompletableFuture<>();
+			requestResourceFutures.put(resourceId, requestResourceFuture);
+
+			LaunchableMesosWorker launchable = createLaunchableMesosWorker(worker.taskID());
+
+			log.info("Scheduling Mesos task {} with ({} MB, {} cpus, {} gpus, {} disk MB, {} Mbps).",
+					launchable.taskID().getValue(), launchable.taskRequest().getMemory(), launchable.taskRequest().getCPUs(),
+					launchable.taskRequest().getScalarRequests().get("gpus"), launchable.taskRequest().getDisk(), launchable.taskRequest().getNetworkMbps());
+
+			// tell the task monitor about the new plans
+			taskMonitor.tell(new TaskMonitor.TaskGoalStateUpdated(extractGoalState(worker)), selfActor);
+
+			// tell the launch coordinator to launch the new tasks
+			launchCoordinator.tell(new LaunchCoordinator.Launch(Collections.singletonList(launchable)), selfActor);
+
+			return requestResourceFuture;
+		} catch (Exception ex) {
+			final ResourceManagerException exception =  new ResourceManagerException("Unable to request new workers.", ex);
+			getResourceEventHandler().onError(exception);
+			return FutureUtils.completedExceptionally(exception);
+		}
+	}
+
+	@Override
+	public void releaseResource(RegisteredMesosWorkerNode workerNode) {
+		try {
+			// update persistent state of worker to Released
+			MesosWorkerStore.Worker worker = workerNode.getWorker();
+			worker = worker.releaseWorker();
+			workerStore.putWorker(worker);
+
+			taskMonitor.tell(new TaskMonitor.TaskGoalStateUpdated(extractGoalState(worker)), selfActor);
+
+			if (worker.hostname().isDefined()) {
+				// tell the launch coordinator that the task is being unassigned from the host, for planning purposes
+				launchCoordinator.tell(new LaunchCoordinator.Unassign(worker.taskID(), worker.hostname().get()), selfActor);
+			}
+		} catch (Exception e) {
+			getResourceEventHandler().onError(new ResourceManagerException("Unable to release a worker.", e));
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	//  Mesos Specific
+	// ------------------------------------------------------------------------
+
+	private void registered(Registered message) {
+		connectionMonitor.tell(message, selfActor);
+		try {
+			workerStore.setFrameworkID(Option.apply(message.frameworkId()));
+		} catch (Exception ex) {
+			getResourceEventHandler().onError(new ResourceManagerException("Unable to store the assigned framework ID.", ex));
+			return;
+		}
+
+		launchCoordinator.tell(message, selfActor);
+		reconciliationCoordinator.tell(message, selfActor);
+		taskMonitor.tell(message, selfActor);
+	}
+
+	/**
+	 * Called when reconnected to Mesos following a failover event.
+	 */
+	private void reregistered(ReRegistered message) {
+		connectionMonitor.tell(message, selfActor);
+		launchCoordinator.tell(message, selfActor);
+		reconciliationCoordinator.tell(message, selfActor);
+		taskMonitor.tell(message, selfActor);
+	}
+
+	/**
+	 * Called when disconnected from Mesos.
+	 */
+	private void disconnected(Disconnected message) {
+		connectionMonitor.tell(message, selfActor);
+		launchCoordinator.tell(message, selfActor);
+		reconciliationCoordinator.tell(message, selfActor);
+		taskMonitor.tell(message, selfActor);
+	}
+
+	/**
+	 * Called when resource offers are made to the framework.
+	 */
+	private void resourceOffers(ResourceOffers message) {
+		launchCoordinator.tell(message, selfActor);
+	}
+
+	/**
+	 * Called when resource offers are rescinded.
+	 */
+	private void offerRescinded(OfferRescinded message) {
+		launchCoordinator.tell(message, selfActor);
+	}
+
+	/**
+	 * Handles a task status update from Mesos.
+	 */
+	private void statusUpdate(StatusUpdate message) {
+		taskMonitor.tell(message, selfActor);
+		reconciliationCoordinator.tell(message, selfActor);
+		schedulerDriver.acknowledgeStatusUpdate(message.status());
+	}
+
+	/**
+	 * Accept offers as advised by the launch coordinator.
+	 *
+	 * <p>Acceptance is routed through the RM to update the persistent state before
+	 * forwarding the message to Mesos.
+	 */
+	private void acceptOffers(AcceptOffers msg) {
+		try {
+			List<TaskMonitor.TaskGoalStateUpdated> toMonitor = new ArrayList<>(msg.operations().size());
+
+			// transition the persistent state of some tasks to Launched
+			for (Protos.Offer.Operation op : msg.operations()) {
+				if (op.getType() == Protos.Offer.Operation.Type.LAUNCH) {
+					for (Protos.TaskInfo info : op.getLaunch().getTaskInfosList()) {
+
+						final ResourceID resourceId = extractResourceID(info.getTaskId());
+
+						MesosWorkerStore.Worker worker = workersInNew.remove(resourceId);
+						assert (worker != null);
+
+						worker = worker.launchWorker(info.getSlaveId(), msg.hostname());
+						workerStore.putWorker(worker);
+
+						final CompletableFuture<RegisteredMesosWorkerNode> requestResourceFuture = requestResourceFutures.remove(resourceId);
+						assert (requestResourceFuture != null);
+
+						requestResourceFuture.complete(new RegisteredMesosWorkerNode(worker));
+
+						log.info("Launching Mesos task {} on host {}.",
+								worker.taskID().getValue(), worker.hostname().get());
+
+						toMonitor.add(new TaskMonitor.TaskGoalStateUpdated(extractGoalState(worker)));
+					}
+				}
+			}
+
+			// tell the task monitor about the new plans
+			for (TaskMonitor.TaskGoalStateUpdated update : toMonitor) {
+				taskMonitor.tell(update, selfActor);
+			}
+
+			// send the acceptance message to Mesos
+			schedulerDriver.acceptOffers(msg.offerIds(), msg.operations(), msg.filters());
+		} catch (Exception ex) {
+			getResourceEventHandler().onError(new ResourceManagerException("unable to accept offers", ex));
+		}
+	}
+
+	/**
+	 * Handles a reconciliation request from a task monitor.
+	 */
+	private void reconcile(ReconciliationCoordinator.Reconcile message) {
+		// forward to the reconciliation coordinator
+		reconciliationCoordinator.tell(message, selfActor);
+	}
+
+	/**
+	 * Handles a termination notification from a task monitor.
+	 */
+	private void taskTerminated(TaskMonitor.TaskTerminated message) {
+		Protos.TaskID taskID = message.taskID();
+		Protos.TaskStatus status = message.status();
+
+		// note: this callback occurs for failed containers and for released containers alike
+		final ResourceID id = extractResourceID(taskID);
+
+		boolean existed;
+		try {
+			existed = workerStore.removeWorker(taskID);
+		} catch (Exception ex) {
+			getResourceEventHandler().onError(new ResourceManagerException("unable to remove worker", ex));
+			return;
+		}
+
+		if (!existed) {
+			log.info("Received a termination notice for an unrecognized worker: {}", id);
+			return;
+		}
+
+		// check if this is a failed task or a released task
+		assert(!workersInNew.containsKey(id));
+
+		final String diagnostics = extractTerminatedDiagnostics(id, status);
+		getResourceEventHandler().onWorkerTerminated(id, diagnostics);
+	}
+
+	// ------------------------------------------------------------------------
+	//  Internal
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Fetches framework/worker information persisted by a prior incarnation of the RM.
+	 */
+	private CompletableFuture<List<MesosWorkerStore.Worker>> getWorkersAsync() {
+		// if this resource manager is recovering from failure,
+		// then some worker tasks are most likely still alive and we can re-obtain them
+		return CompletableFuture.supplyAsync(() -> {
+			try {
+				final List<MesosWorkerStore.Worker> tasksFromPreviousAttempts = workerStore.recoverWorkers();
+				for (final MesosWorkerStore.Worker worker : tasksFromPreviousAttempts) {
+					if (worker.state() == MesosWorkerStore.WorkerState.New) {
+						// remove new workers because allocation requests are transient
+						workerStore.removeWorker(worker.taskID());
+					}
+				}
+				return tasksFromPreviousAttempts;
+			} catch (final Exception e) {
+				throw new CompletionException(new ResourceManagerException(e));
+			}
+		});

Review comment:
       This will be executed in a global thread pool `ForkJoinPool#commonPool()`. This is not ideal since `workerStore.recoverWorkers` can be blocking.

##########
File path: flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerDriver.java
##########
@@ -0,0 +1,713 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.runtime.clusterframework;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.mesos.runtime.clusterframework.actors.MesosResourceManagerActorFactory;
+import org.apache.flink.mesos.runtime.clusterframework.services.MesosServices;
+import org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore;
+import org.apache.flink.mesos.scheduler.ConnectionMonitor;
+import org.apache.flink.mesos.scheduler.LaunchCoordinator;
+import org.apache.flink.mesos.scheduler.ReconciliationCoordinator;
+import org.apache.flink.mesos.scheduler.TaskMonitor;
+import org.apache.flink.mesos.scheduler.TaskSchedulerBuilder;
+import org.apache.flink.mesos.scheduler.messages.AcceptOffers;
+import org.apache.flink.mesos.scheduler.messages.Disconnected;
+import org.apache.flink.mesos.scheduler.messages.OfferRescinded;
+import org.apache.flink.mesos.scheduler.messages.ReRegistered;
+import org.apache.flink.mesos.scheduler.messages.Registered;
+import org.apache.flink.mesos.scheduler.messages.ResourceOffers;
+import org.apache.flink.mesos.scheduler.messages.StatusUpdate;
+import org.apache.flink.mesos.util.MesosArtifactServer;
+import org.apache.flink.mesos.util.MesosConfiguration;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.ContainerSpecification;
+import org.apache.flink.runtime.clusterframework.TaskExecutorProcessSpec;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.resourcemanager.WorkerResourceSpec;
+import org.apache.flink.runtime.resourcemanager.active.AbstractResourceManagerDriver;
+import org.apache.flink.runtime.resourcemanager.active.ResourceManagerDriver;
+import org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.Preconditions;
+
+import akka.actor.ActorRef;
+import akka.actor.UntypedAbstractActor;
+import com.netflix.fenzo.TaskRequest;
+import com.netflix.fenzo.TaskScheduler;
+import com.netflix.fenzo.VirtualMachineLease;
+import com.netflix.fenzo.functions.Action1;
+import org.apache.mesos.Protos;
+import org.apache.mesos.Scheduler;
+import org.apache.mesos.SchedulerDriver;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import scala.Option;
+import scala.concurrent.duration.FiniteDuration;
+
+/**
+ * Implementation of {@link ResourceManagerDriver} for Mesos deployment.
+ */
+public class MesosResourceManagerDriver extends AbstractResourceManagerDriver<RegisteredMesosWorkerNode> {
+
+	/** The Mesos configuration (master and framework info). */
+	private final MesosConfiguration mesosConfig;
+
+	/** The Mesos services needed by the resource manager. */
+	private final MesosServices mesosServices;
+
+	/** The TaskManager container parameters (like container memory size). */
+	private final MesosTaskManagerParameters taskManagerParameters;
+
+	/** Container specification for launching a TM. */
+	private final ContainerSpecification taskManagerContainerSpec;
+
+	/** Server for HTTP artifacts. */
+	private final MesosArtifactServer artifactServer;
+
+	/** Persistent storage of allocated containers. */
+	private MesosWorkerStore workerStore;
+
+	/** Factory for creating local actors. */
+	private final MesosResourceManagerActorFactory actorFactory;
+
+	/** Web url to show in mesos page. */
+	@Nullable
+	private final String webUiUrl;
+
+	/** Mesos scheduler driver. */
+	private SchedulerDriver schedulerDriver;
+
+	/** an adapter to receive messages from Akka actors. */
+	private ActorRef selfActor;
+
+	private ActorRef connectionMonitor;
+
+	private ActorRef taskMonitor;
+
+	private ActorRef launchCoordinator;
+
+	private ActorRef reconciliationCoordinator;
+
+	/** Workers that are requested but not yet offered. */
+	private final Map<ResourceID, MesosWorkerStore.Worker> workersInNew;
+	private final Map<ResourceID, CompletableFuture<RegisteredMesosWorkerNode>> requestResourceFutures;
+
+	private MesosConfiguration initializedMesosConfig;
+
+	public MesosResourceManagerDriver(
+			Configuration flinkConfig,
+			MesosServices mesosServices,
+			MesosConfiguration mesosConfig,
+			MesosTaskManagerParameters taskManagerParameters,
+			ContainerSpecification taskManagerContainerSpec,
+			@Nullable String webUiUrl) {
+		super(flinkConfig, GlobalConfiguration.loadConfiguration());
+
+		this.mesosServices = Preconditions.checkNotNull(mesosServices);
+		this.actorFactory = Preconditions.checkNotNull(mesosServices.createMesosResourceManagerActorFactory());
+
+		this.mesosConfig = Preconditions.checkNotNull(mesosConfig);
+
+		this.artifactServer = Preconditions.checkNotNull(mesosServices.getArtifactServer());
+
+		this.taskManagerParameters = Preconditions.checkNotNull(taskManagerParameters);
+		this.taskManagerContainerSpec = Preconditions.checkNotNull(taskManagerContainerSpec);
+		this.webUiUrl = webUiUrl;
+
+		this.workersInNew = new HashMap<>();
+		this.requestResourceFutures = new HashMap<>();
+	}
+
+	// ------------------------------------------------------------------------
+	//  ResourceManagerDriver
+	// ------------------------------------------------------------------------
+
+	@Override
+	protected void initializeInternal() throws Exception {
+		// create and start the worker store
+		try {
+			this.workerStore = mesosServices.createMesosWorkerStore(flinkConfig);
+			workerStore.start();
+		} catch (Exception e) {
+			throw new ResourceManagerException("Unable to initialize the worker store.", e);
+		}
+
+		// Prepare to register with Mesos
+		Protos.FrameworkInfo.Builder frameworkInfo = mesosConfig.frameworkInfo()
+				.clone()
+				.setCheckpoint(true);
+		if (webUiUrl != null) {
+			frameworkInfo.setWebuiUrl(webUiUrl);
+		}
+
+		try {
+			Option<Protos.FrameworkID> frameworkID = workerStore.getFrameworkID();
+			if (frameworkID.isEmpty()) {
+				log.info("Registering as new framework.");
+			} else {
+				log.info("Recovery scenario: re-registering using framework ID {}.", frameworkID.get().getValue());
+				frameworkInfo.setId(frameworkID.get());
+			}
+		} catch (Exception e) {
+			throw new ResourceManagerException("Unable to recover the framework ID.", e);
+		}
+
+		initializedMesosConfig = mesosConfig.withFrameworkInfo(frameworkInfo);
+		MesosConfiguration.logMesosConfig(log, initializedMesosConfig);
+
+		this.selfActor = actorFactory.createSelfActorForMesosResourceManagerDriver(this);
+
+		// configure the artifact server to serve the TM container artifacts
+		try {
+			LaunchableMesosWorker.configureArtifactServer(artifactServer, taskManagerContainerSpec);
+		}
+		catch (IOException e) {
+			throw new ResourceManagerException("Unable to configure the artifact server with TaskManager artifacts.", e);
+		}
+	}
+
+	@Override
+	public CompletableFuture<Void> terminate() {
+		return stopSupportingActorsAsync();
+	}
+
+	@Override
+	public CompletableFuture<Void> onGrantLeadership() {
+		Preconditions.checkState(initializedMesosConfig != null);
+
+		schedulerDriver = mesosServices.createMesosSchedulerDriver(
+				initializedMesosConfig,
+				new MesosResourceManagerSchedulerCallback(),
+				false);
+
+		// create supporting actors
+		connectionMonitor = actorFactory.createConnectionMonitor(flinkConfig);
+		launchCoordinator = actorFactory.createLaunchCoordinator(flinkConfig, selfActor, schedulerDriver, createOptimizer());
+		reconciliationCoordinator = actorFactory.createReconciliationCoordinator(flinkConfig, schedulerDriver);
+		taskMonitor = actorFactory.createTaskMonitor(flinkConfig, selfActor, schedulerDriver);
+
+		return getWorkersAsync().thenApplyAsync((tasksFromPreviousAttempts) -> {
+			// recover state
+			recoverWorkers(tasksFromPreviousAttempts);
+
+			// begin scheduling
+			connectionMonitor.tell(new ConnectionMonitor.Start(), selfActor);
+			schedulerDriver.start();
+
+			log.info("Mesos resource manager started.");
+			return null;
+		}, getMainThreadExecutor());
+	}
+
+	@Override
+	public CompletableFuture<Void> onRevokeLeadership() {
+		schedulerDriver.stop(true);
+
+		workersInNew.clear();
+		requestResourceFutures.clear();
+
+		return stopSupportingActorsAsync();
+	}
+
+	@Override
+	public void deregisterApplication(ApplicationStatus finalStatus, @Nullable String optionalDiagnostics) throws Exception {
+		log.info("Shutting down and unregistering as a Mesos framework.");
+
+		Exception exception = null;
+
+		try {
+			// unregister the framework, which implicitly removes all tasks.
+			schedulerDriver.stop(false);
+		} catch (Exception ex) {
+			exception = new Exception("Could not unregister the Mesos framework.", ex);
+		}
+
+		try {
+			workerStore.stop(true);
+		} catch (Exception ex) {
+			exception = ExceptionUtils.firstOrSuppressed(
+					new Exception("Could not stop the Mesos worker store.", ex),
+					exception);
+		}
+
+		if (exception != null) {
+			throw new ResourceManagerException("Could not properly shut down the Mesos application.", exception);
+		}
+	}
+
+	@Override
+	public CompletableFuture<RegisteredMesosWorkerNode> requestResource(TaskExecutorProcessSpec taskExecutorProcessSpec) {
+		Preconditions.checkArgument(Objects.equals(
+				taskExecutorProcessSpec,
+				taskManagerParameters.containeredParameters().getTaskExecutorProcessSpec()));
+		log.info("Starting a new worker.");
+
+		try {
+			// generate new workers into persistent state and launch associated actors
+			// TODO: arbitrary WorkerResourceSpec used here, which should be removed after removing MesosResourceManager.
+			MesosWorkerStore.Worker worker = MesosWorkerStore.Worker.newWorker(workerStore.newTaskID(), WorkerResourceSpec.ZERO);
+			workerStore.putWorker(worker);
+
+			final ResourceID resourceId = extractResourceID(worker.taskID());
+			workersInNew.put(resourceId, worker);
+
+			final CompletableFuture<RegisteredMesosWorkerNode> requestResourceFuture = new CompletableFuture<>();
+			requestResourceFutures.put(resourceId, requestResourceFuture);
+
+			LaunchableMesosWorker launchable = createLaunchableMesosWorker(worker.taskID());
+
+			log.info("Scheduling Mesos task {} with ({} MB, {} cpus, {} gpus, {} disk MB, {} Mbps).",
+					launchable.taskID().getValue(), launchable.taskRequest().getMemory(), launchable.taskRequest().getCPUs(),
+					launchable.taskRequest().getScalarRequests().get("gpus"), launchable.taskRequest().getDisk(), launchable.taskRequest().getNetworkMbps());
+
+			// tell the task monitor about the new plans
+			taskMonitor.tell(new TaskMonitor.TaskGoalStateUpdated(extractGoalState(worker)), selfActor);
+
+			// tell the launch coordinator to launch the new tasks
+			launchCoordinator.tell(new LaunchCoordinator.Launch(Collections.singletonList(launchable)), selfActor);
+
+			return requestResourceFuture;
+		} catch (Exception ex) {
+			final ResourceManagerException exception =  new ResourceManagerException("Unable to request new workers.", ex);
+			getResourceEventHandler().onError(exception);
+			return FutureUtils.completedExceptionally(exception);
+		}
+	}
+
+	@Override
+	public void releaseResource(RegisteredMesosWorkerNode workerNode) {
+		try {
+			// update persistent state of worker to Released
+			MesosWorkerStore.Worker worker = workerNode.getWorker();
+			worker = worker.releaseWorker();
+			workerStore.putWorker(worker);
+
+			taskMonitor.tell(new TaskMonitor.TaskGoalStateUpdated(extractGoalState(worker)), selfActor);
+
+			if (worker.hostname().isDefined()) {
+				// tell the launch coordinator that the task is being unassigned from the host, for planning purposes
+				launchCoordinator.tell(new LaunchCoordinator.Unassign(worker.taskID(), worker.hostname().get()), selfActor);
+			}
+		} catch (Exception e) {
+			getResourceEventHandler().onError(new ResourceManagerException("Unable to release a worker.", e));
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	//  Mesos Specific
+	// ------------------------------------------------------------------------
+
+	private void registered(Registered message) {
+		connectionMonitor.tell(message, selfActor);
+		try {
+			workerStore.setFrameworkID(Option.apply(message.frameworkId()));
+		} catch (Exception ex) {
+			getResourceEventHandler().onError(new ResourceManagerException("Unable to store the assigned framework ID.", ex));
+			return;
+		}
+
+		launchCoordinator.tell(message, selfActor);
+		reconciliationCoordinator.tell(message, selfActor);
+		taskMonitor.tell(message, selfActor);
+	}
+
+	/**
+	 * Called when reconnected to Mesos following a failover event.
+	 */
+	private void reregistered(ReRegistered message) {
+		connectionMonitor.tell(message, selfActor);
+		launchCoordinator.tell(message, selfActor);
+		reconciliationCoordinator.tell(message, selfActor);
+		taskMonitor.tell(message, selfActor);
+	}
+
+	/**
+	 * Called when disconnected from Mesos.
+	 */
+	private void disconnected(Disconnected message) {
+		connectionMonitor.tell(message, selfActor);
+		launchCoordinator.tell(message, selfActor);
+		reconciliationCoordinator.tell(message, selfActor);
+		taskMonitor.tell(message, selfActor);
+	}
+
+	/**
+	 * Called when resource offers are made to the framework.
+	 */
+	private void resourceOffers(ResourceOffers message) {
+		launchCoordinator.tell(message, selfActor);
+	}
+
+	/**
+	 * Called when resource offers are rescinded.
+	 */
+	private void offerRescinded(OfferRescinded message) {
+		launchCoordinator.tell(message, selfActor);
+	}
+
+	/**
+	 * Handles a task status update from Mesos.
+	 */
+	private void statusUpdate(StatusUpdate message) {
+		taskMonitor.tell(message, selfActor);
+		reconciliationCoordinator.tell(message, selfActor);
+		schedulerDriver.acknowledgeStatusUpdate(message.status());
+	}
+
+	/**
+	 * Accept offers as advised by the launch coordinator.
+	 *
+	 * <p>Acceptance is routed through the RM to update the persistent state before
+	 * forwarding the message to Mesos.
+	 */
+	private void acceptOffers(AcceptOffers msg) {
+		try {
+			List<TaskMonitor.TaskGoalStateUpdated> toMonitor = new ArrayList<>(msg.operations().size());
+
+			// transition the persistent state of some tasks to Launched
+			for (Protos.Offer.Operation op : msg.operations()) {
+				if (op.getType() == Protos.Offer.Operation.Type.LAUNCH) {
+					for (Protos.TaskInfo info : op.getLaunch().getTaskInfosList()) {
+
+						final ResourceID resourceId = extractResourceID(info.getTaskId());
+
+						MesosWorkerStore.Worker worker = workersInNew.remove(resourceId);
+						assert (worker != null);
+
+						worker = worker.launchWorker(info.getSlaveId(), msg.hostname());
+						workerStore.putWorker(worker);
+
+						final CompletableFuture<RegisteredMesosWorkerNode> requestResourceFuture = requestResourceFutures.remove(resourceId);
+						assert (requestResourceFuture != null);
+
+						requestResourceFuture.complete(new RegisteredMesosWorkerNode(worker));
+
+						log.info("Launching Mesos task {} on host {}.",
+								worker.taskID().getValue(), worker.hostname().get());
+
+						toMonitor.add(new TaskMonitor.TaskGoalStateUpdated(extractGoalState(worker)));
+					}
+				}
+			}
+
+			// tell the task monitor about the new plans
+			for (TaskMonitor.TaskGoalStateUpdated update : toMonitor) {
+				taskMonitor.tell(update, selfActor);
+			}
+
+			// send the acceptance message to Mesos
+			schedulerDriver.acceptOffers(msg.offerIds(), msg.operations(), msg.filters());
+		} catch (Exception ex) {
+			getResourceEventHandler().onError(new ResourceManagerException("unable to accept offers", ex));
+		}
+	}
+
+	/**
+	 * Handles a reconciliation request from a task monitor.
+	 */
+	private void reconcile(ReconciliationCoordinator.Reconcile message) {
+		// forward to the reconciliation coordinator
+		reconciliationCoordinator.tell(message, selfActor);
+	}
+
+	/**
+	 * Handles a termination notification from a task monitor.
+	 */
+	private void taskTerminated(TaskMonitor.TaskTerminated message) {
+		Protos.TaskID taskID = message.taskID();
+		Protos.TaskStatus status = message.status();
+
+		// note: this callback occurs for failed containers and for released containers alike
+		final ResourceID id = extractResourceID(taskID);
+
+		boolean existed;
+		try {
+			existed = workerStore.removeWorker(taskID);
+		} catch (Exception ex) {
+			getResourceEventHandler().onError(new ResourceManagerException("unable to remove worker", ex));
+			return;
+		}
+
+		if (!existed) {
+			log.info("Received a termination notice for an unrecognized worker: {}", id);
+			return;
+		}
+
+		// check if this is a failed task or a released task
+		assert(!workersInNew.containsKey(id));
+
+		final String diagnostics = extractTerminatedDiagnostics(id, status);
+		getResourceEventHandler().onWorkerTerminated(id, diagnostics);
+	}
+
+	// ------------------------------------------------------------------------
+	//  Internal
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Fetches framework/worker information persisted by a prior incarnation of the RM.
+	 */
+	private CompletableFuture<List<MesosWorkerStore.Worker>> getWorkersAsync() {
+		// if this resource manager is recovering from failure,
+		// then some worker tasks are most likely still alive and we can re-obtain them
+		return CompletableFuture.supplyAsync(() -> {
+			try {
+				final List<MesosWorkerStore.Worker> tasksFromPreviousAttempts = workerStore.recoverWorkers();
+				for (final MesosWorkerStore.Worker worker : tasksFromPreviousAttempts) {
+					if (worker.state() == MesosWorkerStore.WorkerState.New) {
+						// remove new workers because allocation requests are transient
+						workerStore.removeWorker(worker.taskID());
+					}
+				}
+				return tasksFromPreviousAttempts;
+			} catch (final Exception e) {
+				throw new CompletionException(new ResourceManagerException(e));
+			}
+		});
+	}
+
+	/**
+	 * Recovers given framework/worker information.
+	 *
+	 * @see #getWorkersAsync()
+	 */
+	private void recoverWorkers(final List<MesosWorkerStore.Worker> tasksFromPreviousAttempts) {
+		assertStateCleared();
+
+		if (!tasksFromPreviousAttempts.isEmpty()) {
+			log.info("Retrieved {} TaskManagers from previous attempt", tasksFromPreviousAttempts.size());
+
+			List<MesosWorkerStore.Worker> launchedWorkers = tasksFromPreviousAttempts
+					.stream()
+					.peek(worker -> taskMonitor.tell(new TaskMonitor.TaskGoalStateUpdated(extractGoalState(worker)), selfActor))
+					.filter(worker -> worker.state() == MesosWorkerStore.WorkerState.Launched)
+					.collect(Collectors.toList());
+
+			// tell the launch coordinator about prior assignments
+			List<Tuple2<TaskRequest, String>> toAssign = launchedWorkers
+					.stream()
+					.map(worker -> new Tuple2<>(createLaunchableMesosWorker(worker.taskID()).taskRequest(), worker.hostname().get()))
+					.collect(Collectors.toList());
+			launchCoordinator.tell(new LaunchCoordinator.Assign(toAssign), selfActor);
+
+			// notify resource event handler about recovered workers
+			getResourceEventHandler().onPreviousAttemptWorkersRecovered(launchedWorkers
+					.stream()
+					.map(RegisteredMesosWorkerNode::new)
+					.collect(Collectors.toList()));
+		}
+	}
+
+	private CompletableFuture<Void> stopSupportingActorsAsync() {
+		FiniteDuration stopTimeout = new FiniteDuration(5L, TimeUnit.SECONDS);
+
+		CompletableFuture<Boolean> stopTaskMonitorFuture = actorFactory.stopActor(taskMonitor, stopTimeout);
+		taskMonitor = null;
+
+		CompletableFuture<Boolean> stopConnectionMonitorFuture = actorFactory.stopActor(connectionMonitor, stopTimeout);
+		connectionMonitor = null;
+
+		CompletableFuture<Boolean> stopLaunchCoordinatorFuture = actorFactory.stopActor(launchCoordinator, stopTimeout);
+		launchCoordinator = null;
+
+		CompletableFuture<Boolean> stopReconciliationCoordinatorFuture = actorFactory.stopActor(reconciliationCoordinator, stopTimeout);
+		reconciliationCoordinator = null;
+
+		return CompletableFuture.allOf(
+				stopTaskMonitorFuture,
+				stopConnectionMonitorFuture,
+				stopLaunchCoordinatorFuture,
+				stopReconciliationCoordinatorFuture);
+	}
+
+	/**
+	 * Creates a launchable task for Fenzo to process.
+	 */
+	private LaunchableMesosWorker createLaunchableMesosWorker(Protos.TaskID taskID) {
+		log.debug("LaunchableMesosWorker parameters: {}", taskManagerParameters);
+
+		return new LaunchableMesosWorker(
+				artifactServer,
+				taskManagerParameters,
+				taskManagerContainerSpec,
+				taskID,
+				mesosConfig);
+	}
+
+	/**
+	 * Extracts a unique ResourceID from the Mesos task.
+	 *
+	 * @param taskId the Mesos TaskID
+	 * @return The ResourceID for the container
+	 */
+	private static ResourceID extractResourceID(Protos.TaskID taskId) {
+		return new ResourceID(taskId.getValue());
+	}
+
+	/**
+	 * Extracts the Mesos task goal state from the worker information.
+	 *
+	 * @param worker the persistent worker information.
+	 * @return goal state information for the {@link TaskMonitor}.
+	 */
+	private static TaskMonitor.TaskGoalState extractGoalState(MesosWorkerStore.Worker worker) {
+		switch(worker.state()) {
+			case New: return new TaskMonitor.New(worker.taskID());
+			case Launched: return new TaskMonitor.Launched(worker.taskID(), worker.slaveID().get());
+			case Released: return new TaskMonitor.Released(worker.taskID(), worker.slaveID().get());
+			default: throw new IllegalArgumentException("unsupported worker state");
+		}
+	}
+
+	private static String extractTerminatedDiagnostics(ResourceID id, Protos.TaskStatus status) {
+		return String.format("Worker %s terminated with status: %s, reason: %s, message: %s.",
+				id, status.getState(), status.getReason(), status.getMessage());
+	}
+
+	private static TaskSchedulerBuilder createOptimizer() {
+		return new TaskSchedulerBuilderImpl();
+	}
+
+	private void assertStateCleared() {
+		assert(workersInNew.isEmpty());
+		assert(requestResourceFutures.isEmpty());
+	}
+	// ------------------------------------------------------------------------
+	//  Internal Classes
+	// ------------------------------------------------------------------------
+
+	private class MesosResourceManagerSchedulerCallback implements Scheduler {
+
+		@Override
+		public void registered(SchedulerDriver driver, final Protos.FrameworkID frameworkId, final Protos.MasterInfo masterInfo) {
+			getMainThreadExecutor().execute(() -> MesosResourceManagerDriver.this.registered(new Registered(frameworkId, masterInfo)));
+		}
+
+		@Override
+		public void reregistered(SchedulerDriver driver, final Protos.MasterInfo masterInfo) {
+			getMainThreadExecutor().execute(() -> MesosResourceManagerDriver.this.reregistered(new ReRegistered(masterInfo)));
+		}
+
+		@Override
+		public void resourceOffers(SchedulerDriver driver, final List<Protos.Offer> offers) {
+			getMainThreadExecutor().execute(() -> MesosResourceManagerDriver.this.resourceOffers(new ResourceOffers(offers)));
+		}
+
+		@Override
+		public void offerRescinded(SchedulerDriver driver, final Protos.OfferID offerId) {
+			getMainThreadExecutor().execute(() -> MesosResourceManagerDriver.this.offerRescinded(new OfferRescinded(offerId)));
+		}
+
+		@Override
+		public void statusUpdate(SchedulerDriver driver, final Protos.TaskStatus status) {
+			getMainThreadExecutor().execute(() -> MesosResourceManagerDriver.this.statusUpdate(new StatusUpdate(status)));
+		}
+
+		@Override
+		public void frameworkMessage(SchedulerDriver driver, final Protos.ExecutorID executorId, final Protos.SlaveID slaveId, final byte[] data) {
+			// noop
+		}
+
+		@Override
+		public void disconnected(SchedulerDriver driver) {
+			getMainThreadExecutor().executeWithoutFencing(() -> MesosResourceManagerDriver.this.disconnected(new Disconnected()));

Review comment:
       I don't fully understand why this message needs to be executed without fencing. If I am not mistaken, then we do stop all supporting actors as well as the scheduler if the `MesosResourceManagerDriver` loses the leadership. Moreover, when gaining the leadership we will create new actors and a `SchedulerDriver`.

##########
File path: flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerDriver.java
##########
@@ -0,0 +1,713 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.runtime.clusterframework;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.mesos.runtime.clusterframework.actors.MesosResourceManagerActorFactory;
+import org.apache.flink.mesos.runtime.clusterframework.services.MesosServices;
+import org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore;
+import org.apache.flink.mesos.scheduler.ConnectionMonitor;
+import org.apache.flink.mesos.scheduler.LaunchCoordinator;
+import org.apache.flink.mesos.scheduler.ReconciliationCoordinator;
+import org.apache.flink.mesos.scheduler.TaskMonitor;
+import org.apache.flink.mesos.scheduler.TaskSchedulerBuilder;
+import org.apache.flink.mesos.scheduler.messages.AcceptOffers;
+import org.apache.flink.mesos.scheduler.messages.Disconnected;
+import org.apache.flink.mesos.scheduler.messages.OfferRescinded;
+import org.apache.flink.mesos.scheduler.messages.ReRegistered;
+import org.apache.flink.mesos.scheduler.messages.Registered;
+import org.apache.flink.mesos.scheduler.messages.ResourceOffers;
+import org.apache.flink.mesos.scheduler.messages.StatusUpdate;
+import org.apache.flink.mesos.util.MesosArtifactServer;
+import org.apache.flink.mesos.util.MesosConfiguration;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.ContainerSpecification;
+import org.apache.flink.runtime.clusterframework.TaskExecutorProcessSpec;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.resourcemanager.WorkerResourceSpec;
+import org.apache.flink.runtime.resourcemanager.active.AbstractResourceManagerDriver;
+import org.apache.flink.runtime.resourcemanager.active.ResourceManagerDriver;
+import org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.Preconditions;
+
+import akka.actor.ActorRef;
+import akka.actor.UntypedAbstractActor;
+import com.netflix.fenzo.TaskRequest;
+import com.netflix.fenzo.TaskScheduler;
+import com.netflix.fenzo.VirtualMachineLease;
+import com.netflix.fenzo.functions.Action1;
+import org.apache.mesos.Protos;
+import org.apache.mesos.Scheduler;
+import org.apache.mesos.SchedulerDriver;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import scala.Option;
+import scala.concurrent.duration.FiniteDuration;
+
+/**
+ * Implementation of {@link ResourceManagerDriver} for Mesos deployment.
+ */
+public class MesosResourceManagerDriver extends AbstractResourceManagerDriver<RegisteredMesosWorkerNode> {
+
+	/** The Mesos configuration (master and framework info). */
+	private final MesosConfiguration mesosConfig;
+
+	/** The Mesos services needed by the resource manager. */
+	private final MesosServices mesosServices;
+
+	/** The TaskManager container parameters (like container memory size). */
+	private final MesosTaskManagerParameters taskManagerParameters;
+
+	/** Container specification for launching a TM. */
+	private final ContainerSpecification taskManagerContainerSpec;
+
+	/** Server for HTTP artifacts. */
+	private final MesosArtifactServer artifactServer;
+
+	/** Persistent storage of allocated containers. */
+	private MesosWorkerStore workerStore;
+
+	/** Factory for creating local actors. */
+	private final MesosResourceManagerActorFactory actorFactory;
+
+	/** Web url to show in mesos page. */
+	@Nullable
+	private final String webUiUrl;
+
+	/** Mesos scheduler driver. */
+	private SchedulerDriver schedulerDriver;
+
+	/** an adapter to receive messages from Akka actors. */
+	private ActorRef selfActor;
+
+	private ActorRef connectionMonitor;
+
+	private ActorRef taskMonitor;
+
+	private ActorRef launchCoordinator;
+
+	private ActorRef reconciliationCoordinator;
+
+	/** Workers that are requested but not yet offered. */
+	private final Map<ResourceID, MesosWorkerStore.Worker> workersInNew;
+	private final Map<ResourceID, CompletableFuture<RegisteredMesosWorkerNode>> requestResourceFutures;
+
+	private MesosConfiguration initializedMesosConfig;
+
+	public MesosResourceManagerDriver(
+			Configuration flinkConfig,
+			MesosServices mesosServices,
+			MesosConfiguration mesosConfig,
+			MesosTaskManagerParameters taskManagerParameters,
+			ContainerSpecification taskManagerContainerSpec,
+			@Nullable String webUiUrl) {
+		super(flinkConfig, GlobalConfiguration.loadConfiguration());
+
+		this.mesosServices = Preconditions.checkNotNull(mesosServices);
+		this.actorFactory = Preconditions.checkNotNull(mesosServices.createMesosResourceManagerActorFactory());
+
+		this.mesosConfig = Preconditions.checkNotNull(mesosConfig);
+
+		this.artifactServer = Preconditions.checkNotNull(mesosServices.getArtifactServer());
+
+		this.taskManagerParameters = Preconditions.checkNotNull(taskManagerParameters);
+		this.taskManagerContainerSpec = Preconditions.checkNotNull(taskManagerContainerSpec);
+		this.webUiUrl = webUiUrl;
+
+		this.workersInNew = new HashMap<>();
+		this.requestResourceFutures = new HashMap<>();
+	}
+
+	// ------------------------------------------------------------------------
+	//  ResourceManagerDriver
+	// ------------------------------------------------------------------------
+
+	@Override
+	protected void initializeInternal() throws Exception {
+		// create and start the worker store
+		try {
+			this.workerStore = mesosServices.createMesosWorkerStore(flinkConfig);
+			workerStore.start();
+		} catch (Exception e) {
+			throw new ResourceManagerException("Unable to initialize the worker store.", e);
+		}
+
+		// Prepare to register with Mesos
+		Protos.FrameworkInfo.Builder frameworkInfo = mesosConfig.frameworkInfo()
+				.clone()
+				.setCheckpoint(true);
+		if (webUiUrl != null) {
+			frameworkInfo.setWebuiUrl(webUiUrl);
+		}
+
+		try {
+			Option<Protos.FrameworkID> frameworkID = workerStore.getFrameworkID();
+			if (frameworkID.isEmpty()) {
+				log.info("Registering as new framework.");
+			} else {
+				log.info("Recovery scenario: re-registering using framework ID {}.", frameworkID.get().getValue());
+				frameworkInfo.setId(frameworkID.get());
+			}
+		} catch (Exception e) {
+			throw new ResourceManagerException("Unable to recover the framework ID.", e);
+		}
+
+		initializedMesosConfig = mesosConfig.withFrameworkInfo(frameworkInfo);
+		MesosConfiguration.logMesosConfig(log, initializedMesosConfig);
+
+		this.selfActor = actorFactory.createSelfActorForMesosResourceManagerDriver(this);
+
+		// configure the artifact server to serve the TM container artifacts
+		try {
+			LaunchableMesosWorker.configureArtifactServer(artifactServer, taskManagerContainerSpec);
+		}
+		catch (IOException e) {
+			throw new ResourceManagerException("Unable to configure the artifact server with TaskManager artifacts.", e);
+		}
+	}
+
+	@Override
+	public CompletableFuture<Void> terminate() {
+		return stopSupportingActorsAsync();
+	}
+
+	@Override
+	public CompletableFuture<Void> onGrantLeadership() {
+		Preconditions.checkState(initializedMesosConfig != null);
+
+		schedulerDriver = mesosServices.createMesosSchedulerDriver(
+				initializedMesosConfig,
+				new MesosResourceManagerSchedulerCallback(),
+				false);
+
+		// create supporting actors
+		connectionMonitor = actorFactory.createConnectionMonitor(flinkConfig);
+		launchCoordinator = actorFactory.createLaunchCoordinator(flinkConfig, selfActor, schedulerDriver, createOptimizer());
+		reconciliationCoordinator = actorFactory.createReconciliationCoordinator(flinkConfig, schedulerDriver);
+		taskMonitor = actorFactory.createTaskMonitor(flinkConfig, selfActor, schedulerDriver);
+
+		return getWorkersAsync().thenApplyAsync((tasksFromPreviousAttempts) -> {
+			// recover state
+			recoverWorkers(tasksFromPreviousAttempts);
+
+			// begin scheduling
+			connectionMonitor.tell(new ConnectionMonitor.Start(), selfActor);
+			schedulerDriver.start();
+
+			log.info("Mesos resource manager started.");
+			return null;
+		}, getMainThreadExecutor());
+	}
+
+	@Override
+	public CompletableFuture<Void> onRevokeLeadership() {
+		schedulerDriver.stop(true);
+
+		workersInNew.clear();
+		requestResourceFutures.clear();
+
+		return stopSupportingActorsAsync();
+	}
+
+	@Override
+	public void deregisterApplication(ApplicationStatus finalStatus, @Nullable String optionalDiagnostics) throws Exception {
+		log.info("Shutting down and unregistering as a Mesos framework.");
+
+		Exception exception = null;
+
+		try {
+			// unregister the framework, which implicitly removes all tasks.
+			schedulerDriver.stop(false);
+		} catch (Exception ex) {
+			exception = new Exception("Could not unregister the Mesos framework.", ex);
+		}
+
+		try {
+			workerStore.stop(true);
+		} catch (Exception ex) {
+			exception = ExceptionUtils.firstOrSuppressed(
+					new Exception("Could not stop the Mesos worker store.", ex),
+					exception);
+		}
+
+		if (exception != null) {
+			throw new ResourceManagerException("Could not properly shut down the Mesos application.", exception);
+		}
+	}
+
+	@Override
+	public CompletableFuture<RegisteredMesosWorkerNode> requestResource(TaskExecutorProcessSpec taskExecutorProcessSpec) {
+		Preconditions.checkArgument(Objects.equals(
+				taskExecutorProcessSpec,
+				taskManagerParameters.containeredParameters().getTaskExecutorProcessSpec()));
+		log.info("Starting a new worker.");
+
+		try {
+			// generate new workers into persistent state and launch associated actors
+			// TODO: arbitrary WorkerResourceSpec used here, which should be removed after removing MesosResourceManager.
+			MesosWorkerStore.Worker worker = MesosWorkerStore.Worker.newWorker(workerStore.newTaskID(), WorkerResourceSpec.ZERO);
+			workerStore.putWorker(worker);
+
+			final ResourceID resourceId = extractResourceID(worker.taskID());
+			workersInNew.put(resourceId, worker);
+
+			final CompletableFuture<RegisteredMesosWorkerNode> requestResourceFuture = new CompletableFuture<>();
+			requestResourceFutures.put(resourceId, requestResourceFuture);
+
+			LaunchableMesosWorker launchable = createLaunchableMesosWorker(worker.taskID());
+
+			log.info("Scheduling Mesos task {} with ({} MB, {} cpus, {} gpus, {} disk MB, {} Mbps).",
+					launchable.taskID().getValue(), launchable.taskRequest().getMemory(), launchable.taskRequest().getCPUs(),
+					launchable.taskRequest().getScalarRequests().get("gpus"), launchable.taskRequest().getDisk(), launchable.taskRequest().getNetworkMbps());
+
+			// tell the task monitor about the new plans
+			taskMonitor.tell(new TaskMonitor.TaskGoalStateUpdated(extractGoalState(worker)), selfActor);
+
+			// tell the launch coordinator to launch the new tasks
+			launchCoordinator.tell(new LaunchCoordinator.Launch(Collections.singletonList(launchable)), selfActor);
+
+			return requestResourceFuture;
+		} catch (Exception ex) {
+			final ResourceManagerException exception =  new ResourceManagerException("Unable to request new workers.", ex);
+			getResourceEventHandler().onError(exception);
+			return FutureUtils.completedExceptionally(exception);
+		}
+	}
+
+	@Override
+	public void releaseResource(RegisteredMesosWorkerNode workerNode) {
+		try {
+			// update persistent state of worker to Released
+			MesosWorkerStore.Worker worker = workerNode.getWorker();
+			worker = worker.releaseWorker();
+			workerStore.putWorker(worker);
+
+			taskMonitor.tell(new TaskMonitor.TaskGoalStateUpdated(extractGoalState(worker)), selfActor);
+
+			if (worker.hostname().isDefined()) {
+				// tell the launch coordinator that the task is being unassigned from the host, for planning purposes
+				launchCoordinator.tell(new LaunchCoordinator.Unassign(worker.taskID(), worker.hostname().get()), selfActor);
+			}
+		} catch (Exception e) {
+			getResourceEventHandler().onError(new ResourceManagerException("Unable to release a worker.", e));
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	//  Mesos Specific
+	// ------------------------------------------------------------------------
+
+	private void registered(Registered message) {
+		connectionMonitor.tell(message, selfActor);
+		try {
+			workerStore.setFrameworkID(Option.apply(message.frameworkId()));
+		} catch (Exception ex) {
+			getResourceEventHandler().onError(new ResourceManagerException("Unable to store the assigned framework ID.", ex));
+			return;
+		}
+
+		launchCoordinator.tell(message, selfActor);
+		reconciliationCoordinator.tell(message, selfActor);
+		taskMonitor.tell(message, selfActor);
+	}
+
+	/**
+	 * Called when reconnected to Mesos following a failover event.
+	 */
+	private void reregistered(ReRegistered message) {
+		connectionMonitor.tell(message, selfActor);
+		launchCoordinator.tell(message, selfActor);
+		reconciliationCoordinator.tell(message, selfActor);
+		taskMonitor.tell(message, selfActor);
+	}
+
+	/**
+	 * Called when disconnected from Mesos.
+	 */
+	private void disconnected(Disconnected message) {
+		connectionMonitor.tell(message, selfActor);
+		launchCoordinator.tell(message, selfActor);
+		reconciliationCoordinator.tell(message, selfActor);
+		taskMonitor.tell(message, selfActor);
+	}
+
+	/**
+	 * Called when resource offers are made to the framework.
+	 */
+	private void resourceOffers(ResourceOffers message) {
+		launchCoordinator.tell(message, selfActor);
+	}
+
+	/**
+	 * Called when resource offers are rescinded.
+	 */
+	private void offerRescinded(OfferRescinded message) {
+		launchCoordinator.tell(message, selfActor);
+	}
+
+	/**
+	 * Handles a task status update from Mesos.
+	 */
+	private void statusUpdate(StatusUpdate message) {
+		taskMonitor.tell(message, selfActor);
+		reconciliationCoordinator.tell(message, selfActor);
+		schedulerDriver.acknowledgeStatusUpdate(message.status());
+	}
+
+	/**
+	 * Accept offers as advised by the launch coordinator.
+	 *
+	 * <p>Acceptance is routed through the RM to update the persistent state before
+	 * forwarding the message to Mesos.
+	 */
+	private void acceptOffers(AcceptOffers msg) {
+		try {
+			List<TaskMonitor.TaskGoalStateUpdated> toMonitor = new ArrayList<>(msg.operations().size());
+
+			// transition the persistent state of some tasks to Launched
+			for (Protos.Offer.Operation op : msg.operations()) {
+				if (op.getType() == Protos.Offer.Operation.Type.LAUNCH) {
+					for (Protos.TaskInfo info : op.getLaunch().getTaskInfosList()) {
+
+						final ResourceID resourceId = extractResourceID(info.getTaskId());
+
+						MesosWorkerStore.Worker worker = workersInNew.remove(resourceId);
+						assert (worker != null);
+
+						worker = worker.launchWorker(info.getSlaveId(), msg.hostname());
+						workerStore.putWorker(worker);
+
+						final CompletableFuture<RegisteredMesosWorkerNode> requestResourceFuture = requestResourceFutures.remove(resourceId);
+						assert (requestResourceFuture != null);
+
+						requestResourceFuture.complete(new RegisteredMesosWorkerNode(worker));
+
+						log.info("Launching Mesos task {} on host {}.",
+								worker.taskID().getValue(), worker.hostname().get());
+
+						toMonitor.add(new TaskMonitor.TaskGoalStateUpdated(extractGoalState(worker)));
+					}
+				}
+			}
+
+			// tell the task monitor about the new plans
+			for (TaskMonitor.TaskGoalStateUpdated update : toMonitor) {
+				taskMonitor.tell(update, selfActor);
+			}
+
+			// send the acceptance message to Mesos
+			schedulerDriver.acceptOffers(msg.offerIds(), msg.operations(), msg.filters());
+		} catch (Exception ex) {
+			getResourceEventHandler().onError(new ResourceManagerException("unable to accept offers", ex));
+		}
+	}
+
+	/**
+	 * Handles a reconciliation request from a task monitor.
+	 */
+	private void reconcile(ReconciliationCoordinator.Reconcile message) {
+		// forward to the reconciliation coordinator
+		reconciliationCoordinator.tell(message, selfActor);
+	}
+
+	/**
+	 * Handles a termination notification from a task monitor.
+	 */
+	private void taskTerminated(TaskMonitor.TaskTerminated message) {
+		Protos.TaskID taskID = message.taskID();
+		Protos.TaskStatus status = message.status();
+
+		// note: this callback occurs for failed containers and for released containers alike
+		final ResourceID id = extractResourceID(taskID);
+
+		boolean existed;
+		try {
+			existed = workerStore.removeWorker(taskID);
+		} catch (Exception ex) {
+			getResourceEventHandler().onError(new ResourceManagerException("unable to remove worker", ex));
+			return;
+		}
+
+		if (!existed) {
+			log.info("Received a termination notice for an unrecognized worker: {}", id);
+			return;
+		}
+
+		// check if this is a failed task or a released task
+		assert(!workersInNew.containsKey(id));
+
+		final String diagnostics = extractTerminatedDiagnostics(id, status);
+		getResourceEventHandler().onWorkerTerminated(id, diagnostics);
+	}
+
+	// ------------------------------------------------------------------------
+	//  Internal
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Fetches framework/worker information persisted by a prior incarnation of the RM.
+	 */
+	private CompletableFuture<List<MesosWorkerStore.Worker>> getWorkersAsync() {
+		// if this resource manager is recovering from failure,
+		// then some worker tasks are most likely still alive and we can re-obtain them
+		return CompletableFuture.supplyAsync(() -> {
+			try {
+				final List<MesosWorkerStore.Worker> tasksFromPreviousAttempts = workerStore.recoverWorkers();
+				for (final MesosWorkerStore.Worker worker : tasksFromPreviousAttempts) {
+					if (worker.state() == MesosWorkerStore.WorkerState.New) {
+						// remove new workers because allocation requests are transient
+						workerStore.removeWorker(worker.taskID());
+					}
+				}
+				return tasksFromPreviousAttempts;
+			} catch (final Exception e) {
+				throw new CompletionException(new ResourceManagerException(e));
+			}
+		});

Review comment:
       I think we should not remove the `ioExecutor` which was used to run this supply operation.

##########
File path: flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerDriver.java
##########
@@ -0,0 +1,713 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.runtime.clusterframework;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.mesos.runtime.clusterframework.actors.MesosResourceManagerActorFactory;
+import org.apache.flink.mesos.runtime.clusterframework.services.MesosServices;
+import org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore;
+import org.apache.flink.mesos.scheduler.ConnectionMonitor;
+import org.apache.flink.mesos.scheduler.LaunchCoordinator;
+import org.apache.flink.mesos.scheduler.ReconciliationCoordinator;
+import org.apache.flink.mesos.scheduler.TaskMonitor;
+import org.apache.flink.mesos.scheduler.TaskSchedulerBuilder;
+import org.apache.flink.mesos.scheduler.messages.AcceptOffers;
+import org.apache.flink.mesos.scheduler.messages.Disconnected;
+import org.apache.flink.mesos.scheduler.messages.OfferRescinded;
+import org.apache.flink.mesos.scheduler.messages.ReRegistered;
+import org.apache.flink.mesos.scheduler.messages.Registered;
+import org.apache.flink.mesos.scheduler.messages.ResourceOffers;
+import org.apache.flink.mesos.scheduler.messages.StatusUpdate;
+import org.apache.flink.mesos.util.MesosArtifactServer;
+import org.apache.flink.mesos.util.MesosConfiguration;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.ContainerSpecification;
+import org.apache.flink.runtime.clusterframework.TaskExecutorProcessSpec;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.resourcemanager.WorkerResourceSpec;
+import org.apache.flink.runtime.resourcemanager.active.AbstractResourceManagerDriver;
+import org.apache.flink.runtime.resourcemanager.active.ResourceManagerDriver;
+import org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.Preconditions;
+
+import akka.actor.ActorRef;
+import akka.actor.UntypedAbstractActor;
+import com.netflix.fenzo.TaskRequest;
+import com.netflix.fenzo.TaskScheduler;
+import com.netflix.fenzo.VirtualMachineLease;
+import com.netflix.fenzo.functions.Action1;
+import org.apache.mesos.Protos;
+import org.apache.mesos.Scheduler;
+import org.apache.mesos.SchedulerDriver;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import scala.Option;
+import scala.concurrent.duration.FiniteDuration;
+
+/**
+ * Implementation of {@link ResourceManagerDriver} for Mesos deployment.
+ */
+public class MesosResourceManagerDriver extends AbstractResourceManagerDriver<RegisteredMesosWorkerNode> {
+
+	/** The Mesos configuration (master and framework info). */
+	private final MesosConfiguration mesosConfig;
+
+	/** The Mesos services needed by the resource manager. */
+	private final MesosServices mesosServices;
+
+	/** The TaskManager container parameters (like container memory size). */
+	private final MesosTaskManagerParameters taskManagerParameters;
+
+	/** Container specification for launching a TM. */
+	private final ContainerSpecification taskManagerContainerSpec;
+
+	/** Server for HTTP artifacts. */
+	private final MesosArtifactServer artifactServer;
+
+	/** Persistent storage of allocated containers. */
+	private MesosWorkerStore workerStore;
+
+	/** Factory for creating local actors. */
+	private final MesosResourceManagerActorFactory actorFactory;
+
+	/** Web url to show in mesos page. */
+	@Nullable
+	private final String webUiUrl;
+
+	/** Mesos scheduler driver. */
+	private SchedulerDriver schedulerDriver;
+
+	/** an adapter to receive messages from Akka actors. */
+	private ActorRef selfActor;
+
+	private ActorRef connectionMonitor;
+
+	private ActorRef taskMonitor;
+
+	private ActorRef launchCoordinator;
+
+	private ActorRef reconciliationCoordinator;
+
+	/** Workers that are requested but not yet offered. */
+	private final Map<ResourceID, MesosWorkerStore.Worker> workersInNew;
+	private final Map<ResourceID, CompletableFuture<RegisteredMesosWorkerNode>> requestResourceFutures;
+
+	private MesosConfiguration initializedMesosConfig;
+
+	public MesosResourceManagerDriver(
+			Configuration flinkConfig,
+			MesosServices mesosServices,
+			MesosConfiguration mesosConfig,
+			MesosTaskManagerParameters taskManagerParameters,
+			ContainerSpecification taskManagerContainerSpec,
+			@Nullable String webUiUrl) {
+		super(flinkConfig, GlobalConfiguration.loadConfiguration());
+
+		this.mesosServices = Preconditions.checkNotNull(mesosServices);
+		this.actorFactory = Preconditions.checkNotNull(mesosServices.createMesosResourceManagerActorFactory());
+
+		this.mesosConfig = Preconditions.checkNotNull(mesosConfig);
+
+		this.artifactServer = Preconditions.checkNotNull(mesosServices.getArtifactServer());
+
+		this.taskManagerParameters = Preconditions.checkNotNull(taskManagerParameters);
+		this.taskManagerContainerSpec = Preconditions.checkNotNull(taskManagerContainerSpec);
+		this.webUiUrl = webUiUrl;
+
+		this.workersInNew = new HashMap<>();
+		this.requestResourceFutures = new HashMap<>();
+	}
+
+	// ------------------------------------------------------------------------
+	//  ResourceManagerDriver
+	// ------------------------------------------------------------------------
+
+	@Override
+	protected void initializeInternal() throws Exception {
+		// create and start the worker store
+		try {
+			this.workerStore = mesosServices.createMesosWorkerStore(flinkConfig);
+			workerStore.start();
+		} catch (Exception e) {
+			throw new ResourceManagerException("Unable to initialize the worker store.", e);
+		}
+
+		// Prepare to register with Mesos
+		Protos.FrameworkInfo.Builder frameworkInfo = mesosConfig.frameworkInfo()
+				.clone()
+				.setCheckpoint(true);
+		if (webUiUrl != null) {
+			frameworkInfo.setWebuiUrl(webUiUrl);
+		}
+
+		try {
+			Option<Protos.FrameworkID> frameworkID = workerStore.getFrameworkID();
+			if (frameworkID.isEmpty()) {
+				log.info("Registering as new framework.");
+			} else {
+				log.info("Recovery scenario: re-registering using framework ID {}.", frameworkID.get().getValue());
+				frameworkInfo.setId(frameworkID.get());
+			}
+		} catch (Exception e) {
+			throw new ResourceManagerException("Unable to recover the framework ID.", e);
+		}
+
+		initializedMesosConfig = mesosConfig.withFrameworkInfo(frameworkInfo);
+		MesosConfiguration.logMesosConfig(log, initializedMesosConfig);
+
+		this.selfActor = actorFactory.createSelfActorForMesosResourceManagerDriver(this);
+
+		// configure the artifact server to serve the TM container artifacts
+		try {
+			LaunchableMesosWorker.configureArtifactServer(artifactServer, taskManagerContainerSpec);
+		}
+		catch (IOException e) {
+			throw new ResourceManagerException("Unable to configure the artifact server with TaskManager artifacts.", e);
+		}
+	}
+
+	@Override
+	public CompletableFuture<Void> terminate() {
+		return stopSupportingActorsAsync();
+	}
+
+	@Override
+	public CompletableFuture<Void> onGrantLeadership() {

Review comment:
       I think at some point we should try to not reuse `ResourceManagers` across leader sessions. That way we would not have to offer these hooks.

##########
File path: flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerDriver.java
##########
@@ -0,0 +1,713 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.runtime.clusterframework;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.mesos.runtime.clusterframework.actors.MesosResourceManagerActorFactory;
+import org.apache.flink.mesos.runtime.clusterframework.services.MesosServices;
+import org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore;
+import org.apache.flink.mesos.scheduler.ConnectionMonitor;
+import org.apache.flink.mesos.scheduler.LaunchCoordinator;
+import org.apache.flink.mesos.scheduler.ReconciliationCoordinator;
+import org.apache.flink.mesos.scheduler.TaskMonitor;
+import org.apache.flink.mesos.scheduler.TaskSchedulerBuilder;
+import org.apache.flink.mesos.scheduler.messages.AcceptOffers;
+import org.apache.flink.mesos.scheduler.messages.Disconnected;
+import org.apache.flink.mesos.scheduler.messages.OfferRescinded;
+import org.apache.flink.mesos.scheduler.messages.ReRegistered;
+import org.apache.flink.mesos.scheduler.messages.Registered;
+import org.apache.flink.mesos.scheduler.messages.ResourceOffers;
+import org.apache.flink.mesos.scheduler.messages.StatusUpdate;
+import org.apache.flink.mesos.util.MesosArtifactServer;
+import org.apache.flink.mesos.util.MesosConfiguration;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.ContainerSpecification;
+import org.apache.flink.runtime.clusterframework.TaskExecutorProcessSpec;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.resourcemanager.WorkerResourceSpec;
+import org.apache.flink.runtime.resourcemanager.active.AbstractResourceManagerDriver;
+import org.apache.flink.runtime.resourcemanager.active.ResourceManagerDriver;
+import org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.Preconditions;
+
+import akka.actor.ActorRef;
+import akka.actor.UntypedAbstractActor;
+import com.netflix.fenzo.TaskRequest;
+import com.netflix.fenzo.TaskScheduler;
+import com.netflix.fenzo.VirtualMachineLease;
+import com.netflix.fenzo.functions.Action1;
+import org.apache.mesos.Protos;
+import org.apache.mesos.Scheduler;
+import org.apache.mesos.SchedulerDriver;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import scala.Option;
+import scala.concurrent.duration.FiniteDuration;
+
+/**
+ * Implementation of {@link ResourceManagerDriver} for Mesos deployment.
+ */
+public class MesosResourceManagerDriver extends AbstractResourceManagerDriver<RegisteredMesosWorkerNode> {
+
+	/** The Mesos configuration (master and framework info). */
+	private final MesosConfiguration mesosConfig;
+
+	/** The Mesos services needed by the resource manager. */
+	private final MesosServices mesosServices;
+
+	/** The TaskManager container parameters (like container memory size). */
+	private final MesosTaskManagerParameters taskManagerParameters;
+
+	/** Container specification for launching a TM. */
+	private final ContainerSpecification taskManagerContainerSpec;
+
+	/** Server for HTTP artifacts. */
+	private final MesosArtifactServer artifactServer;
+
+	/** Persistent storage of allocated containers. */
+	private MesosWorkerStore workerStore;
+
+	/** Factory for creating local actors. */
+	private final MesosResourceManagerActorFactory actorFactory;
+
+	/** Web url to show in mesos page. */
+	@Nullable
+	private final String webUiUrl;
+
+	/** Mesos scheduler driver. */
+	private SchedulerDriver schedulerDriver;
+
+	/** an adapter to receive messages from Akka actors. */
+	private ActorRef selfActor;
+
+	private ActorRef connectionMonitor;
+
+	private ActorRef taskMonitor;
+
+	private ActorRef launchCoordinator;
+
+	private ActorRef reconciliationCoordinator;
+
+	/** Workers that are requested but not yet offered. */
+	private final Map<ResourceID, MesosWorkerStore.Worker> workersInNew;
+	private final Map<ResourceID, CompletableFuture<RegisteredMesosWorkerNode>> requestResourceFutures;
+
+	private MesosConfiguration initializedMesosConfig;
+
+	public MesosResourceManagerDriver(
+			Configuration flinkConfig,
+			MesosServices mesosServices,
+			MesosConfiguration mesosConfig,
+			MesosTaskManagerParameters taskManagerParameters,
+			ContainerSpecification taskManagerContainerSpec,
+			@Nullable String webUiUrl) {
+		super(flinkConfig, GlobalConfiguration.loadConfiguration());
+
+		this.mesosServices = Preconditions.checkNotNull(mesosServices);
+		this.actorFactory = Preconditions.checkNotNull(mesosServices.createMesosResourceManagerActorFactory());
+
+		this.mesosConfig = Preconditions.checkNotNull(mesosConfig);
+
+		this.artifactServer = Preconditions.checkNotNull(mesosServices.getArtifactServer());
+
+		this.taskManagerParameters = Preconditions.checkNotNull(taskManagerParameters);
+		this.taskManagerContainerSpec = Preconditions.checkNotNull(taskManagerContainerSpec);
+		this.webUiUrl = webUiUrl;
+
+		this.workersInNew = new HashMap<>();
+		this.requestResourceFutures = new HashMap<>();
+	}
+
+	// ------------------------------------------------------------------------
+	//  ResourceManagerDriver
+	// ------------------------------------------------------------------------
+
+	@Override
+	protected void initializeInternal() throws Exception {
+		// create and start the worker store
+		try {
+			this.workerStore = mesosServices.createMesosWorkerStore(flinkConfig);
+			workerStore.start();
+		} catch (Exception e) {
+			throw new ResourceManagerException("Unable to initialize the worker store.", e);
+		}
+
+		// Prepare to register with Mesos
+		Protos.FrameworkInfo.Builder frameworkInfo = mesosConfig.frameworkInfo()
+				.clone()
+				.setCheckpoint(true);
+		if (webUiUrl != null) {
+			frameworkInfo.setWebuiUrl(webUiUrl);
+		}
+
+		try {
+			Option<Protos.FrameworkID> frameworkID = workerStore.getFrameworkID();
+			if (frameworkID.isEmpty()) {
+				log.info("Registering as new framework.");
+			} else {
+				log.info("Recovery scenario: re-registering using framework ID {}.", frameworkID.get().getValue());
+				frameworkInfo.setId(frameworkID.get());
+			}
+		} catch (Exception e) {
+			throw new ResourceManagerException("Unable to recover the framework ID.", e);
+		}
+
+		initializedMesosConfig = mesosConfig.withFrameworkInfo(frameworkInfo);
+		MesosConfiguration.logMesosConfig(log, initializedMesosConfig);
+
+		this.selfActor = actorFactory.createSelfActorForMesosResourceManagerDriver(this);
+
+		// configure the artifact server to serve the TM container artifacts
+		try {
+			LaunchableMesosWorker.configureArtifactServer(artifactServer, taskManagerContainerSpec);
+		}
+		catch (IOException e) {
+			throw new ResourceManagerException("Unable to configure the artifact server with TaskManager artifacts.", e);
+		}
+	}
+
+	@Override
+	public CompletableFuture<Void> terminate() {
+		return stopSupportingActorsAsync();
+	}
+
+	@Override
+	public CompletableFuture<Void> onGrantLeadership() {
+		Preconditions.checkState(initializedMesosConfig != null);
+
+		schedulerDriver = mesosServices.createMesosSchedulerDriver(
+				initializedMesosConfig,
+				new MesosResourceManagerSchedulerCallback(),
+				false);
+
+		// create supporting actors
+		connectionMonitor = actorFactory.createConnectionMonitor(flinkConfig);
+		launchCoordinator = actorFactory.createLaunchCoordinator(flinkConfig, selfActor, schedulerDriver, createOptimizer());
+		reconciliationCoordinator = actorFactory.createReconciliationCoordinator(flinkConfig, schedulerDriver);
+		taskMonitor = actorFactory.createTaskMonitor(flinkConfig, selfActor, schedulerDriver);
+
+		return getWorkersAsync().thenApplyAsync((tasksFromPreviousAttempts) -> {
+			// recover state
+			recoverWorkers(tasksFromPreviousAttempts);
+
+			// begin scheduling
+			connectionMonitor.tell(new ConnectionMonitor.Start(), selfActor);
+			schedulerDriver.start();
+
+			log.info("Mesos resource manager started.");
+			return null;
+		}, getMainThreadExecutor());
+	}
+
+	@Override
+	public CompletableFuture<Void> onRevokeLeadership() {
+		schedulerDriver.stop(true);
+
+		workersInNew.clear();
+		requestResourceFutures.clear();
+
+		return stopSupportingActorsAsync();
+	}
+
+	@Override
+	public void deregisterApplication(ApplicationStatus finalStatus, @Nullable String optionalDiagnostics) throws Exception {
+		log.info("Shutting down and unregistering as a Mesos framework.");
+
+		Exception exception = null;
+
+		try {
+			// unregister the framework, which implicitly removes all tasks.
+			schedulerDriver.stop(false);
+		} catch (Exception ex) {
+			exception = new Exception("Could not unregister the Mesos framework.", ex);
+		}
+
+		try {
+			workerStore.stop(true);
+		} catch (Exception ex) {
+			exception = ExceptionUtils.firstOrSuppressed(
+					new Exception("Could not stop the Mesos worker store.", ex),
+					exception);
+		}
+
+		if (exception != null) {
+			throw new ResourceManagerException("Could not properly shut down the Mesos application.", exception);
+		}
+	}
+
+	@Override
+	public CompletableFuture<RegisteredMesosWorkerNode> requestResource(TaskExecutorProcessSpec taskExecutorProcessSpec) {
+		Preconditions.checkArgument(Objects.equals(
+				taskExecutorProcessSpec,
+				taskManagerParameters.containeredParameters().getTaskExecutorProcessSpec()));
+		log.info("Starting a new worker.");
+
+		try {
+			// generate new workers into persistent state and launch associated actors
+			// TODO: arbitrary WorkerResourceSpec used here, which should be removed after removing MesosResourceManager.
+			MesosWorkerStore.Worker worker = MesosWorkerStore.Worker.newWorker(workerStore.newTaskID(), WorkerResourceSpec.ZERO);
+			workerStore.putWorker(worker);
+
+			final ResourceID resourceId = extractResourceID(worker.taskID());
+			workersInNew.put(resourceId, worker);
+
+			final CompletableFuture<RegisteredMesosWorkerNode> requestResourceFuture = new CompletableFuture<>();
+			requestResourceFutures.put(resourceId, requestResourceFuture);
+
+			LaunchableMesosWorker launchable = createLaunchableMesosWorker(worker.taskID());
+
+			log.info("Scheduling Mesos task {} with ({} MB, {} cpus, {} gpus, {} disk MB, {} Mbps).",
+					launchable.taskID().getValue(), launchable.taskRequest().getMemory(), launchable.taskRequest().getCPUs(),
+					launchable.taskRequest().getScalarRequests().get("gpus"), launchable.taskRequest().getDisk(), launchable.taskRequest().getNetworkMbps());
+
+			// tell the task monitor about the new plans
+			taskMonitor.tell(new TaskMonitor.TaskGoalStateUpdated(extractGoalState(worker)), selfActor);
+
+			// tell the launch coordinator to launch the new tasks
+			launchCoordinator.tell(new LaunchCoordinator.Launch(Collections.singletonList(launchable)), selfActor);
+
+			return requestResourceFuture;
+		} catch (Exception ex) {
+			final ResourceManagerException exception =  new ResourceManagerException("Unable to request new workers.", ex);
+			getResourceEventHandler().onError(exception);
+			return FutureUtils.completedExceptionally(exception);
+		}
+	}
+
+	@Override
+	public void releaseResource(RegisteredMesosWorkerNode workerNode) {
+		try {
+			// update persistent state of worker to Released
+			MesosWorkerStore.Worker worker = workerNode.getWorker();
+			worker = worker.releaseWorker();
+			workerStore.putWorker(worker);
+
+			taskMonitor.tell(new TaskMonitor.TaskGoalStateUpdated(extractGoalState(worker)), selfActor);
+
+			if (worker.hostname().isDefined()) {
+				// tell the launch coordinator that the task is being unassigned from the host, for planning purposes
+				launchCoordinator.tell(new LaunchCoordinator.Unassign(worker.taskID(), worker.hostname().get()), selfActor);
+			}
+		} catch (Exception e) {
+			getResourceEventHandler().onError(new ResourceManagerException("Unable to release a worker.", e));
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	//  Mesos Specific
+	// ------------------------------------------------------------------------
+
+	private void registered(Registered message) {
+		connectionMonitor.tell(message, selfActor);
+		try {
+			workerStore.setFrameworkID(Option.apply(message.frameworkId()));
+		} catch (Exception ex) {
+			getResourceEventHandler().onError(new ResourceManagerException("Unable to store the assigned framework ID.", ex));
+			return;
+		}
+
+		launchCoordinator.tell(message, selfActor);
+		reconciliationCoordinator.tell(message, selfActor);
+		taskMonitor.tell(message, selfActor);
+	}
+
+	/**
+	 * Called when reconnected to Mesos following a failover event.
+	 */
+	private void reregistered(ReRegistered message) {
+		connectionMonitor.tell(message, selfActor);
+		launchCoordinator.tell(message, selfActor);
+		reconciliationCoordinator.tell(message, selfActor);
+		taskMonitor.tell(message, selfActor);
+	}
+
+	/**
+	 * Called when disconnected from Mesos.
+	 */
+	private void disconnected(Disconnected message) {
+		connectionMonitor.tell(message, selfActor);
+		launchCoordinator.tell(message, selfActor);
+		reconciliationCoordinator.tell(message, selfActor);
+		taskMonitor.tell(message, selfActor);
+	}
+
+	/**
+	 * Called when resource offers are made to the framework.
+	 */
+	private void resourceOffers(ResourceOffers message) {
+		launchCoordinator.tell(message, selfActor);
+	}
+
+	/**
+	 * Called when resource offers are rescinded.
+	 */
+	private void offerRescinded(OfferRescinded message) {
+		launchCoordinator.tell(message, selfActor);
+	}
+
+	/**
+	 * Handles a task status update from Mesos.
+	 */
+	private void statusUpdate(StatusUpdate message) {
+		taskMonitor.tell(message, selfActor);
+		reconciliationCoordinator.tell(message, selfActor);
+		schedulerDriver.acknowledgeStatusUpdate(message.status());
+	}
+
+	/**
+	 * Accept offers as advised by the launch coordinator.
+	 *
+	 * <p>Acceptance is routed through the RM to update the persistent state before
+	 * forwarding the message to Mesos.
+	 */
+	private void acceptOffers(AcceptOffers msg) {
+		try {
+			List<TaskMonitor.TaskGoalStateUpdated> toMonitor = new ArrayList<>(msg.operations().size());
+
+			// transition the persistent state of some tasks to Launched
+			for (Protos.Offer.Operation op : msg.operations()) {
+				if (op.getType() == Protos.Offer.Operation.Type.LAUNCH) {
+					for (Protos.TaskInfo info : op.getLaunch().getTaskInfosList()) {
+
+						final ResourceID resourceId = extractResourceID(info.getTaskId());
+
+						MesosWorkerStore.Worker worker = workersInNew.remove(resourceId);
+						assert (worker != null);
+
+						worker = worker.launchWorker(info.getSlaveId(), msg.hostname());
+						workerStore.putWorker(worker);
+
+						final CompletableFuture<RegisteredMesosWorkerNode> requestResourceFuture = requestResourceFutures.remove(resourceId);
+						assert (requestResourceFuture != null);
+
+						requestResourceFuture.complete(new RegisteredMesosWorkerNode(worker));
+
+						log.info("Launching Mesos task {} on host {}.",
+								worker.taskID().getValue(), worker.hostname().get());
+
+						toMonitor.add(new TaskMonitor.TaskGoalStateUpdated(extractGoalState(worker)));
+					}
+				}
+			}
+
+			// tell the task monitor about the new plans
+			for (TaskMonitor.TaskGoalStateUpdated update : toMonitor) {
+				taskMonitor.tell(update, selfActor);
+			}
+
+			// send the acceptance message to Mesos
+			schedulerDriver.acceptOffers(msg.offerIds(), msg.operations(), msg.filters());
+		} catch (Exception ex) {
+			getResourceEventHandler().onError(new ResourceManagerException("unable to accept offers", ex));
+		}
+	}
+
+	/**
+	 * Handles a reconciliation request from a task monitor.
+	 */
+	private void reconcile(ReconciliationCoordinator.Reconcile message) {
+		// forward to the reconciliation coordinator
+		reconciliationCoordinator.tell(message, selfActor);
+	}
+
+	/**
+	 * Handles a termination notification from a task monitor.
+	 */
+	private void taskTerminated(TaskMonitor.TaskTerminated message) {
+		Protos.TaskID taskID = message.taskID();
+		Protos.TaskStatus status = message.status();
+
+		// note: this callback occurs for failed containers and for released containers alike
+		final ResourceID id = extractResourceID(taskID);
+
+		boolean existed;
+		try {
+			existed = workerStore.removeWorker(taskID);
+		} catch (Exception ex) {
+			getResourceEventHandler().onError(new ResourceManagerException("unable to remove worker", ex));
+			return;
+		}
+
+		if (!existed) {
+			log.info("Received a termination notice for an unrecognized worker: {}", id);
+			return;
+		}
+
+		// check if this is a failed task or a released task
+		assert(!workersInNew.containsKey(id));
+
+		final String diagnostics = extractTerminatedDiagnostics(id, status);
+		getResourceEventHandler().onWorkerTerminated(id, diagnostics);
+	}
+
+	// ------------------------------------------------------------------------
+	//  Internal
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Fetches framework/worker information persisted by a prior incarnation of the RM.
+	 */
+	private CompletableFuture<List<MesosWorkerStore.Worker>> getWorkersAsync() {
+		// if this resource manager is recovering from failure,
+		// then some worker tasks are most likely still alive and we can re-obtain them
+		return CompletableFuture.supplyAsync(() -> {
+			try {
+				final List<MesosWorkerStore.Worker> tasksFromPreviousAttempts = workerStore.recoverWorkers();
+				for (final MesosWorkerStore.Worker worker : tasksFromPreviousAttempts) {
+					if (worker.state() == MesosWorkerStore.WorkerState.New) {
+						// remove new workers because allocation requests are transient
+						workerStore.removeWorker(worker.taskID());
+					}
+				}
+				return tasksFromPreviousAttempts;
+			} catch (final Exception e) {
+				throw new CompletionException(new ResourceManagerException(e));
+			}
+		});
+	}
+
+	/**
+	 * Recovers given framework/worker information.
+	 *
+	 * @see #getWorkersAsync()
+	 */
+	private void recoverWorkers(final List<MesosWorkerStore.Worker> tasksFromPreviousAttempts) {
+		assertStateCleared();
+
+		if (!tasksFromPreviousAttempts.isEmpty()) {
+			log.info("Retrieved {} TaskManagers from previous attempt", tasksFromPreviousAttempts.size());
+
+			List<MesosWorkerStore.Worker> launchedWorkers = tasksFromPreviousAttempts
+					.stream()
+					.peek(worker -> taskMonitor.tell(new TaskMonitor.TaskGoalStateUpdated(extractGoalState(worker)), selfActor))
+					.filter(worker -> worker.state() == MesosWorkerStore.WorkerState.Launched)
+					.collect(Collectors.toList());
+
+			// tell the launch coordinator about prior assignments
+			List<Tuple2<TaskRequest, String>> toAssign = launchedWorkers
+					.stream()
+					.map(worker -> new Tuple2<>(createLaunchableMesosWorker(worker.taskID()).taskRequest(), worker.hostname().get()))
+					.collect(Collectors.toList());
+			launchCoordinator.tell(new LaunchCoordinator.Assign(toAssign), selfActor);
+
+			// notify resource event handler about recovered workers
+			getResourceEventHandler().onPreviousAttemptWorkersRecovered(launchedWorkers
+					.stream()
+					.map(RegisteredMesosWorkerNode::new)
+					.collect(Collectors.toList()));
+		}
+	}
+
+	private CompletableFuture<Void> stopSupportingActorsAsync() {
+		FiniteDuration stopTimeout = new FiniteDuration(5L, TimeUnit.SECONDS);
+
+		CompletableFuture<Boolean> stopTaskMonitorFuture = actorFactory.stopActor(taskMonitor, stopTimeout);
+		taskMonitor = null;
+
+		CompletableFuture<Boolean> stopConnectionMonitorFuture = actorFactory.stopActor(connectionMonitor, stopTimeout);
+		connectionMonitor = null;
+
+		CompletableFuture<Boolean> stopLaunchCoordinatorFuture = actorFactory.stopActor(launchCoordinator, stopTimeout);
+		launchCoordinator = null;
+
+		CompletableFuture<Boolean> stopReconciliationCoordinatorFuture = actorFactory.stopActor(reconciliationCoordinator, stopTimeout);
+		reconciliationCoordinator = null;
+
+		return CompletableFuture.allOf(
+				stopTaskMonitorFuture,
+				stopConnectionMonitorFuture,
+				stopLaunchCoordinatorFuture,
+				stopReconciliationCoordinatorFuture);
+	}
+
+	/**
+	 * Creates a launchable task for Fenzo to process.
+	 */
+	private LaunchableMesosWorker createLaunchableMesosWorker(Protos.TaskID taskID) {
+		log.debug("LaunchableMesosWorker parameters: {}", taskManagerParameters);
+
+		return new LaunchableMesosWorker(
+				artifactServer,
+				taskManagerParameters,
+				taskManagerContainerSpec,
+				taskID,
+				mesosConfig);
+	}
+
+	/**
+	 * Extracts a unique ResourceID from the Mesos task.
+	 *
+	 * @param taskId the Mesos TaskID
+	 * @return The ResourceID for the container
+	 */
+	private static ResourceID extractResourceID(Protos.TaskID taskId) {
+		return new ResourceID(taskId.getValue());
+	}
+
+	/**
+	 * Extracts the Mesos task goal state from the worker information.
+	 *
+	 * @param worker the persistent worker information.
+	 * @return goal state information for the {@link TaskMonitor}.
+	 */
+	private static TaskMonitor.TaskGoalState extractGoalState(MesosWorkerStore.Worker worker) {
+		switch(worker.state()) {
+			case New: return new TaskMonitor.New(worker.taskID());
+			case Launched: return new TaskMonitor.Launched(worker.taskID(), worker.slaveID().get());
+			case Released: return new TaskMonitor.Released(worker.taskID(), worker.slaveID().get());
+			default: throw new IllegalArgumentException("unsupported worker state");
+		}
+	}
+
+	private static String extractTerminatedDiagnostics(ResourceID id, Protos.TaskStatus status) {
+		return String.format("Worker %s terminated with status: %s, reason: %s, message: %s.",
+				id, status.getState(), status.getReason(), status.getMessage());
+	}
+
+	private static TaskSchedulerBuilder createOptimizer() {
+		return new TaskSchedulerBuilderImpl();
+	}
+
+	private void assertStateCleared() {
+		assert(workersInNew.isEmpty());
+		assert(requestResourceFutures.isEmpty());
+	}
+	// ------------------------------------------------------------------------
+	//  Internal Classes
+	// ------------------------------------------------------------------------
+
+	private class MesosResourceManagerSchedulerCallback implements Scheduler {
+
+		@Override
+		public void registered(SchedulerDriver driver, final Protos.FrameworkID frameworkId, final Protos.MasterInfo masterInfo) {
+			getMainThreadExecutor().execute(() -> MesosResourceManagerDriver.this.registered(new Registered(frameworkId, masterInfo)));
+		}
+
+		@Override
+		public void reregistered(SchedulerDriver driver, final Protos.MasterInfo masterInfo) {
+			getMainThreadExecutor().execute(() -> MesosResourceManagerDriver.this.reregistered(new ReRegistered(masterInfo)));
+		}
+
+		@Override
+		public void resourceOffers(SchedulerDriver driver, final List<Protos.Offer> offers) {
+			getMainThreadExecutor().execute(() -> MesosResourceManagerDriver.this.resourceOffers(new ResourceOffers(offers)));
+		}
+
+		@Override
+		public void offerRescinded(SchedulerDriver driver, final Protos.OfferID offerId) {
+			getMainThreadExecutor().execute(() -> MesosResourceManagerDriver.this.offerRescinded(new OfferRescinded(offerId)));
+		}
+
+		@Override
+		public void statusUpdate(SchedulerDriver driver, final Protos.TaskStatus status) {
+			getMainThreadExecutor().execute(() -> MesosResourceManagerDriver.this.statusUpdate(new StatusUpdate(status)));
+		}
+
+		@Override
+		public void frameworkMessage(SchedulerDriver driver, final Protos.ExecutorID executorId, final Protos.SlaveID slaveId, final byte[] data) {
+			// noop
+		}
+
+		@Override
+		public void disconnected(SchedulerDriver driver) {
+			getMainThreadExecutor().executeWithoutFencing(() -> MesosResourceManagerDriver.this.disconnected(new Disconnected()));

Review comment:
       I actually think that this is an artifact of the fix for FLINK-9936 where we first tried to keep the supporting actors alive during a loss of leadership.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org