You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2019/01/31 10:58:49 UTC

[flink] branch master updated: [FLINK-11412] [mesos] Remove legacy MesosFlinkResourceManager

This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new a43d3c8  [FLINK-11412] [mesos] Remove legacy MesosFlinkResourceManager
a43d3c8 is described below

commit a43d3c85884de1f45d4884b26265a53a69d6bee7
Author: tison <wa...@gmail.com>
AuthorDate: Tue Jan 29 11:54:37 2019 +0800

    [FLINK-11412] [mesos] Remove legacy MesosFlinkResourceManager
---
 .../src/main/resources/log4j.properties            |   1 -
 .../MesosFlinkResourceManager.java                 | 795 ---------------------
 .../RegisteredMesosWorkerNode.java                 |   2 +-
 flink-mesos/src/main/resources/log4j.properties    |   1 -
 .../MesosFlinkResourceManagerTest.java             | 785 --------------------
 .../clusterframework/MesosResourceManagerTest.java |   4 +-
 6 files changed, 3 insertions(+), 1585 deletions(-)

diff --git a/flink-container/src/main/resources/log4j.properties b/flink-container/src/main/resources/log4j.properties
index 62cb6ed..dcbb808 100644
--- a/flink-container/src/main/resources/log4j.properties
+++ b/flink-container/src/main/resources/log4j.properties
@@ -24,4 +24,3 @@ log4j.appender.console.layout=org.apache.log4j.PatternLayout
 log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
 
 log4j.logger.org.apache.flink.mesos=DEBUG
-log4j.logger.org.apache.flink.mesos.runtime.clusterframework.MesosFlinkResourceManager=INFO
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManager.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManager.java
deleted file mode 100644
index dcf6a7e..0000000
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManager.java
+++ /dev/null
@@ -1,795 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.mesos.runtime.clusterframework;
-
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.IllegalConfigurationException;
-import org.apache.flink.mesos.configuration.MesosOptions;
-import org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore;
-import org.apache.flink.mesos.scheduler.ConnectionMonitor;
-import org.apache.flink.mesos.scheduler.LaunchCoordinator;
-import org.apache.flink.mesos.scheduler.LaunchableTask;
-import org.apache.flink.mesos.scheduler.ReconciliationCoordinator;
-import org.apache.flink.mesos.scheduler.SchedulerProxy;
-import org.apache.flink.mesos.scheduler.TaskMonitor;
-import org.apache.flink.mesos.scheduler.TaskSchedulerBuilder;
-import org.apache.flink.mesos.scheduler.Tasks;
-import org.apache.flink.mesos.scheduler.messages.AcceptOffers;
-import org.apache.flink.mesos.scheduler.messages.Disconnected;
-import org.apache.flink.mesos.scheduler.messages.Error;
-import org.apache.flink.mesos.scheduler.messages.OfferRescinded;
-import org.apache.flink.mesos.scheduler.messages.ReRegistered;
-import org.apache.flink.mesos.scheduler.messages.Registered;
-import org.apache.flink.mesos.scheduler.messages.ResourceOffers;
-import org.apache.flink.mesos.scheduler.messages.StatusUpdate;
-import org.apache.flink.mesos.util.MesosArtifactResolver;
-import org.apache.flink.mesos.util.MesosConfiguration;
-import org.apache.flink.runtime.clusterframework.ApplicationStatus;
-import org.apache.flink.runtime.clusterframework.ContainerSpecification;
-import org.apache.flink.runtime.clusterframework.FlinkResourceManager;
-import org.apache.flink.runtime.clusterframework.messages.FatalErrorOccurred;
-import org.apache.flink.runtime.clusterframework.messages.StopCluster;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
-
-import akka.actor.ActorRef;
-import akka.actor.Props;
-import com.netflix.fenzo.TaskRequest;
-import com.netflix.fenzo.TaskScheduler;
-import com.netflix.fenzo.VirtualMachineLease;
-import com.netflix.fenzo.functions.Action1;
-import org.apache.mesos.Protos;
-import org.apache.mesos.Protos.FrameworkInfo;
-import org.apache.mesos.Protos.FrameworkInfo.Capability;
-import org.apache.mesos.SchedulerDriver;
-import org.slf4j.Logger;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import scala.Option;
-
-import static java.util.Objects.requireNonNull;
-
-/**
- * Flink Resource Manager for Apache Mesos.
- */
-public class MesosFlinkResourceManager extends FlinkResourceManager<RegisteredMesosWorkerNode> {
-
-	/** The Mesos configuration (master and framework info). */
-	private final MesosConfiguration mesosConfig;
-
-	/** The TaskManager container parameters (like container memory size). */
-	private final MesosTaskManagerParameters taskManagerParameters;
-
-	/** Container specification for launching a TM. */
-	private final ContainerSpecification taskManagerContainerSpec;
-
-	/** Resolver for HTTP artifacts. **/
-	private final MesosArtifactResolver artifactResolver;
-
-	/** Number of failed Mesos tasks before stopping the application. -1 means infinite. */
-	private final int maxFailedTasks;
-
-	/** Callback handler for the asynchronous Mesos scheduler. */
-	private SchedulerProxy schedulerCallbackHandler;
-
-	/** Mesos scheduler driver. */
-	private SchedulerDriver schedulerDriver;
-
-	private ActorRef connectionMonitor;
-
-	private ActorRef taskRouter;
-
-	private ActorRef launchCoordinator;
-
-	private ActorRef reconciliationCoordinator;
-
-	private final MesosWorkerStore workerStore;
-
-	/** planning state related to workers - package private for unit test purposes. */
-	final Map<ResourceID, MesosWorkerStore.Worker> workersInNew;
-	final Map<ResourceID, MesosWorkerStore.Worker> workersInLaunch;
-	final Map<ResourceID, MesosWorkerStore.Worker> workersBeingReturned;
-
-	/** The number of failed tasks since the master became active. */
-	private int failedTasksSoFar;
-
-	public MesosFlinkResourceManager(
-		Configuration flinkConfig,
-		MesosConfiguration mesosConfig,
-		MesosWorkerStore workerStore,
-		LeaderRetrievalService leaderRetrievalService,
-		MesosTaskManagerParameters taskManagerParameters,
-		ContainerSpecification taskManagerContainerSpec,
-		MesosArtifactResolver artifactResolver,
-		int maxFailedTasks,
-		int numInitialTaskManagers) {
-
-		super(numInitialTaskManagers, flinkConfig, leaderRetrievalService);
-
-		this.mesosConfig = requireNonNull(mesosConfig);
-
-		this.workerStore = requireNonNull(workerStore);
-		this.artifactResolver = requireNonNull(artifactResolver);
-
-		this.taskManagerParameters = requireNonNull(taskManagerParameters);
-		this.taskManagerContainerSpec = requireNonNull(taskManagerContainerSpec);
-		this.maxFailedTasks = maxFailedTasks;
-
-		this.workersInNew = new HashMap<>();
-		this.workersInLaunch = new HashMap<>();
-		this.workersBeingReturned = new HashMap<>();
-	}
-
-	// ------------------------------------------------------------------------
-	//  Mesos-specific behavior
-	// ------------------------------------------------------------------------
-
-	@Override
-	protected void initialize() throws Exception {
-		LOG.info("Initializing Mesos resource master");
-
-		workerStore.start();
-
-		// create the scheduler driver to communicate with Mesos
-		schedulerCallbackHandler = new SchedulerProxy(self());
-
-		// register with Mesos
-		FrameworkInfo.Builder frameworkInfo = mesosConfig.frameworkInfo()
-			.clone()
-			.setCheckpoint(true);
-
-		Option<Protos.FrameworkID> frameworkID = workerStore.getFrameworkID();
-		if (frameworkID.isEmpty()) {
-			LOG.info("Registering as new framework.");
-		}
-		else {
-			LOG.info("Recovery scenario: re-registering using framework ID {}.", frameworkID.get().getValue());
-			frameworkInfo.setId(frameworkID.get());
-		}
-
-		if (taskManagerParameters.gpus() > 0) {
-			LOG.info("Add GPU_RESOURCES capability to framework");
-			frameworkInfo.addCapabilities(Capability.newBuilder().setType(Capability.Type.GPU_RESOURCES));
-		}
-
-		MesosConfiguration initializedMesosConfig = mesosConfig.withFrameworkInfo(frameworkInfo);
-		MesosConfiguration.logMesosConfig(LOG, initializedMesosConfig);
-		schedulerDriver = initializedMesosConfig.createDriver(schedulerCallbackHandler, false);
-
-		// create supporting actors
-		connectionMonitor = createConnectionMonitor();
-		launchCoordinator = createLaunchCoordinator();
-		reconciliationCoordinator = createReconciliationCoordinator();
-		taskRouter = createTaskRouter();
-
-		recoverWorkers();
-
-		connectionMonitor.tell(new ConnectionMonitor.Start(), self());
-		schedulerDriver.start();
-	}
-
-	protected ActorRef createConnectionMonitor() {
-		return context().actorOf(
-			ConnectionMonitor.createActorProps(ConnectionMonitor.class, config),
-			"connectionMonitor");
-	}
-
-	protected ActorRef createTaskRouter() {
-		return context().actorOf(
-			Tasks.createActorProps(Tasks.class, self(), config, schedulerDriver, TaskMonitor.class),
-			"tasks");
-	}
-
-	protected ActorRef createLaunchCoordinator() {
-		return context().actorOf(
-			LaunchCoordinator.createActorProps(LaunchCoordinator.class, self(), config, schedulerDriver, createOptimizer()),
-			"launchCoordinator");
-	}
-
-	protected ActorRef createReconciliationCoordinator() {
-		return context().actorOf(
-			ReconciliationCoordinator.createActorProps(ReconciliationCoordinator.class, config, schedulerDriver),
-			"reconciliationCoordinator");
-	}
-
-	@Override
-	public void postStop() {
-		LOG.info("Stopping Mesos resource master");
-		super.postStop();
-	}
-
-	// ------------------------------------------------------------------------
-	//  Actor messages
-	// ------------------------------------------------------------------------
-
-	@Override
-	protected void handleMessage(Object message) {
-
-		// check for Mesos-specific actor messages first
-
-		// --- messages about Mesos connection
-		if (message instanceof Registered) {
-			registered((Registered) message);
-		} else if (message instanceof ReRegistered) {
-			reregistered((ReRegistered) message);
-		} else if (message instanceof Disconnected) {
-			disconnected((Disconnected) message);
-		} else if (message instanceof Error) {
-			error(((Error) message).message());
-
-		// --- messages about offers
-		} else if (message instanceof ResourceOffers || message instanceof OfferRescinded) {
-			launchCoordinator.tell(message, self());
-		} else if (message instanceof AcceptOffers) {
-			acceptOffers((AcceptOffers) message);
-
-		// --- messages about tasks
-		} else if (message instanceof StatusUpdate) {
-			taskStatusUpdated((StatusUpdate) message);
-		} else if (message instanceof ReconciliationCoordinator.Reconcile) {
-			// a reconciliation request from a task
-			reconciliationCoordinator.tell(message, self());
-		} else if (message instanceof TaskMonitor.TaskTerminated) {
-			// a termination message from a task
-			TaskMonitor.TaskTerminated msg = (TaskMonitor.TaskTerminated) message;
-			taskTerminated(msg.taskID(), msg.status());
-
-		} else {
-			// message handled by the generic resource master code
-			super.handleMessage(message);
-		}
-	}
-
-	/**
-	 * Called to shut down the cluster (not a failover situation).
-	 *
-	 * @param finalStatus The application status to report.
-	 * @param optionalDiagnostics An optional diagnostics message.
-	 */
-	@Override
-	protected void shutdownApplication(ApplicationStatus finalStatus, String optionalDiagnostics) {
-
-		LOG.info("Shutting down and unregistering as a Mesos framework.");
-		try {
-			// unregister the framework, which implicitly removes all tasks.
-			schedulerDriver.stop(false);
-		} catch (Exception ex) {
-			LOG.warn("unable to unregister the framework", ex);
-		}
-
-		try {
-			workerStore.stop(true);
-		} catch (Exception ex) {
-			LOG.warn("unable to stop the worker state store", ex);
-		}
-
-		context().stop(self());
-	}
-
-	@Override
-	protected void fatalError(String message, Throwable error) {
-		// we do not unregister, but cause a hard fail of this process, to have it
-		// restarted by the dispatcher
-		LOG.error("FATAL ERROR IN MESOS APPLICATION MASTER: " + message, error);
-		LOG.error("Shutting down process");
-
-		// kill this process, this will make an external supervisor (the dispatcher) restart the process
-		System.exit(EXIT_CODE_FATAL_ERROR);
-	}
-
-	// ------------------------------------------------------------------------
-	//  Worker Management
-	// ------------------------------------------------------------------------
-
-	/**
-	 * Recover framework/worker information persisted by a prior incarnation of the RM.
-	 */
-	private void recoverWorkers() throws Exception {
-		// if this application master starts as part of an ApplicationMaster/JobManager recovery,
-		// then some worker tasks are most likely still alive and we can re-obtain them
-		final List<MesosWorkerStore.Worker> tasksFromPreviousAttempts = workerStore.recoverWorkers();
-
-		if (!tasksFromPreviousAttempts.isEmpty()) {
-			LOG.info("Retrieved {} TaskManagers from previous attempt", tasksFromPreviousAttempts.size());
-
-			List<Tuple2<TaskRequest, String>> toAssign = new ArrayList<>(tasksFromPreviousAttempts.size());
-			List<LaunchableTask> toLaunch = new ArrayList<>(tasksFromPreviousAttempts.size());
-
-			for (final MesosWorkerStore.Worker worker : tasksFromPreviousAttempts) {
-				LaunchableMesosWorker launchable = createLaunchableMesosWorker(worker.taskID());
-
-				switch (worker.state()) {
-					case New:
-						workersInNew.put(extractResourceID(worker.taskID()), worker);
-						toLaunch.add(launchable);
-						break;
-					case Launched:
-						workersInLaunch.put(extractResourceID(worker.taskID()), worker);
-						toAssign.add(new Tuple2<>(launchable.taskRequest(), worker.hostname().get()));
-						break;
-					case Released:
-						workersBeingReturned.put(extractResourceID(worker.taskID()), worker);
-						break;
-				}
-				taskRouter.tell(new TaskMonitor.TaskGoalStateUpdated(extractGoalState(worker)), self());
-			}
-
-			// tell the launch coordinator about prior assignments
-			if (toAssign.size() >= 1) {
-				launchCoordinator.tell(new LaunchCoordinator.Assign(toAssign), self());
-			}
-			// tell the launch coordinator to launch any new tasks
-			if (toLaunch.size() >= 1) {
-				launchCoordinator.tell(new LaunchCoordinator.Launch(toLaunch), self());
-			}
-		}
-	}
-
-	/**
-	 * Plan for some additional workers to be launched.
-	 *
-	 * @param numWorkers The number of workers to allocate.
-	 */
-	@Override
-	protected void requestNewWorkers(int numWorkers) {
-
-		try {
-			List<TaskMonitor.TaskGoalStateUpdated> toMonitor = new ArrayList<>(numWorkers);
-			List<LaunchableTask> toLaunch = new ArrayList<>(numWorkers);
-
-			// generate new workers into persistent state and launch associated actors
-			for (int i = 0; i < numWorkers; i++) {
-				MesosWorkerStore.Worker worker = MesosWorkerStore.Worker.newWorker(workerStore.newTaskID());
-				workerStore.putWorker(worker);
-				workersInNew.put(extractResourceID(worker.taskID()), worker);
-
-				LaunchableMesosWorker launchable = createLaunchableMesosWorker(worker.taskID());
-
-				LOG.info("Scheduling Mesos task {} with ({} MB, {} cpus, {} gpus).",
-					launchable.taskID().getValue(), launchable.taskRequest().getMemory(), launchable.taskRequest().getCPUs(),
-					launchable.taskRequest().getScalarRequests().get("gpus"));
-
-				toMonitor.add(new TaskMonitor.TaskGoalStateUpdated(extractGoalState(worker)));
-				toLaunch.add(launchable);
-			}
-
-			// tell the task router about the new plans
-			for (TaskMonitor.TaskGoalStateUpdated update : toMonitor) {
-				taskRouter.tell(update, self());
-			}
-
-			// tell the launch coordinator to launch the new tasks
-			if (toLaunch.size() >= 1) {
-				launchCoordinator.tell(new LaunchCoordinator.Launch(toLaunch), self());
-			}
-		} catch (Exception ex) {
-			fatalError("unable to request new workers", ex);
-		}
-	}
-
-	/**
-	 * Accept offers as advised by the launch coordinator.
-	 *
-	 * <p>Acceptance is routed through the RM to update the persistent state before
-	 * forwarding the message to Mesos.
-	 */
-	private void acceptOffers(AcceptOffers msg) {
-
-		try {
-			List<TaskMonitor.TaskGoalStateUpdated> toMonitor = new ArrayList<>(msg.operations().size());
-
-			// transition the persistent state of some tasks to Launched
-			for (Protos.Offer.Operation op : msg.operations()) {
-				if (op.getType() != Protos.Offer.Operation.Type.LAUNCH) {
-					continue;
-				}
-				for (Protos.TaskInfo info : op.getLaunch().getTaskInfosList()) {
-					MesosWorkerStore.Worker worker = workersInNew.remove(extractResourceID(info.getTaskId()));
-					assert (worker != null);
-
-					worker = worker.launchWorker(info.getSlaveId(), msg.hostname());
-					workerStore.putWorker(worker);
-					workersInLaunch.put(extractResourceID(worker.taskID()), worker);
-
-					LOG.info("Launching Mesos task {} on host {}.",
-						worker.taskID().getValue(), worker.hostname().get());
-
-					toMonitor.add(new TaskMonitor.TaskGoalStateUpdated(extractGoalState(worker)));
-				}
-			}
-
-			// tell the task router about the new plans
-			for (TaskMonitor.TaskGoalStateUpdated update : toMonitor) {
-				taskRouter.tell(update, self());
-			}
-
-			// send the acceptance message to Mesos
-			schedulerDriver.acceptOffers(msg.offerIds(), msg.operations(), msg.filters());
-		} catch (Exception ex) {
-			fatalError("unable to accept offers", ex);
-		}
-	}
-
-	/**
-	 * Handle a task status change.
-	 */
-	private void taskStatusUpdated(StatusUpdate message) {
-		taskRouter.tell(message, self());
-		reconciliationCoordinator.tell(message, self());
-		schedulerDriver.acknowledgeStatusUpdate(message.status());
-	}
-
-	/**
-	 * Accept the given started worker into the internal state.
-	 *
-	 * @param resourceID The worker resource id
-	 * @return A registered worker node record.
-	 */
-	@Override
-	protected RegisteredMesosWorkerNode workerStarted(ResourceID resourceID) {
-		MesosWorkerStore.Worker inLaunch = workersInLaunch.remove(resourceID);
-		if (inLaunch == null) {
-			// Worker was not in state "being launched", this can indicate that the TaskManager
-			// in this worker was already registered or that the container was not started
-			// by this resource manager. Simply ignore this resourceID.
-			return null;
-		}
-		return new RegisteredMesosWorkerNode(inLaunch);
-	}
-
-	/**
-	 * Accept the given registered workers into the internal state.
-	 *
-	 * @param toConsolidate The worker IDs known previously to the JobManager.
-	 * @return A collection of registered worker node records.
-	 */
-	@Override
-	protected Collection<RegisteredMesosWorkerNode> reacceptRegisteredWorkers(Collection<ResourceID> toConsolidate) {
-
-		// we check for each task manager if we recognize its Mesos task ID
-		List<RegisteredMesosWorkerNode> accepted = new ArrayList<>(toConsolidate.size());
-		for (ResourceID resourceID : toConsolidate) {
-			MesosWorkerStore.Worker worker = workersInLaunch.remove(resourceID);
-			if (worker != null) {
-				LOG.info("Mesos worker consolidation recognizes TaskManager {}.", resourceID);
-				accepted.add(new RegisteredMesosWorkerNode(worker));
-			} else {
-				if (isStarted(resourceID)) {
-					LOG.info("TaskManager {} has already been registered at the resource manager.", resourceID);
-				}
-				else {
-					LOG.info("Mesos worker consolidation does not recognize TaskManager {}.", resourceID);
-				}
-			}
-		}
-		return accepted;
-	}
-
-	/**
-	 * Release the given pending worker.
-	 */
-	@Override
-	protected void releasePendingWorker(ResourceID id) {
-		MesosWorkerStore.Worker worker = workersInLaunch.remove(id);
-		if (worker != null) {
-			releaseWorker(worker);
-		} else {
-			LOG.error("Cannot find worker {} to release. Ignoring request.", id);
-		}
-	}
-
-	/**
-	 * Release the given started worker.
-	 */
-	@Override
-	protected void releaseStartedWorker(RegisteredMesosWorkerNode worker) {
-		releaseWorker(worker.getWorker());
-	}
-
-	/**
-	 * Plan for the removal of the given worker.
-	 */
-	private void releaseWorker(MesosWorkerStore.Worker worker) {
-		try {
-			LOG.info("Releasing worker {}", worker.taskID());
-
-			// update persistent state of worker to Released
-			worker = worker.releaseWorker();
-			workerStore.putWorker(worker);
-			workersBeingReturned.put(extractResourceID(worker.taskID()), worker);
-			taskRouter.tell(new TaskMonitor.TaskGoalStateUpdated(extractGoalState(worker)), self());
-
-			if (worker.hostname().isDefined()) {
-				// tell the launch coordinator that the task is being unassigned from the host, for planning purposes
-				launchCoordinator.tell(new LaunchCoordinator.Unassign(worker.taskID(), worker.hostname().get()), self());
-			}
-		}
-		catch (Exception ex) {
-			fatalError("unable to release worker", ex);
-		}
-	}
-
-	@Override
-	protected int getNumWorkerRequestsPending() {
-		return workersInNew.size();
-	}
-
-	@Override
-	protected int getNumWorkersPendingRegistration() {
-		return workersInLaunch.size();
-	}
-
-	// ------------------------------------------------------------------------
-	//  Callbacks from the Mesos Master
-	// ------------------------------------------------------------------------
-
-	/**
-	 * Called when connected to Mesos as a new framework.
-	 */
-	private void registered(Registered message) {
-		connectionMonitor.tell(message, self());
-
-		try {
-			workerStore.setFrameworkID(Option.apply(message.frameworkId()));
-		} catch (Exception ex) {
-			fatalError("unable to store the assigned framework ID", ex);
-			return;
-		}
-
-		launchCoordinator.tell(message, self());
-		reconciliationCoordinator.tell(message, self());
-		taskRouter.tell(message, self());
-	}
-
-	/**
-	 * Called when reconnected to Mesos following a failover event.
-	 */
-	private void reregistered(ReRegistered message) {
-		connectionMonitor.tell(message, self());
-		launchCoordinator.tell(message, self());
-		reconciliationCoordinator.tell(message, self());
-		taskRouter.tell(message, self());
-	}
-
-	/**
-	 * Called when disconnected from Mesos.
-	 */
-	private void disconnected(Disconnected message) {
-		connectionMonitor.tell(message, self());
-		launchCoordinator.tell(message, self());
-		reconciliationCoordinator.tell(message, self());
-		taskRouter.tell(message, self());
-	}
-
-	/**
-	 * Called when an error is reported by the scheduler callback.
-	 */
-	private void error(String message) {
-		self().tell(new FatalErrorOccurred("Connection to Mesos failed", new Exception(message)), self());
-	}
-
-	/**
-	 * Invoked when a Mesos task reaches a terminal status.
-	 */
-	private void taskTerminated(Protos.TaskID taskID, Protos.TaskStatus status) {
-		// this callback occurs for failed containers and for released containers alike
-
-		final ResourceID id = extractResourceID(taskID);
-
-		boolean existed;
-		try {
-			existed = workerStore.removeWorker(taskID);
-		} catch (Exception ex) {
-			fatalError("unable to remove worker", ex);
-			return;
-		}
-
-		if (!existed) {
-			LOG.info("Received a termination notice for an unrecognized worker: {}", id);
-			return;
-		}
-
-		// check if this is a failed task or a released task
-		if (workersBeingReturned.remove(id) != null) {
-			// regular finished worker that we released
-			LOG.info("Worker {} finished successfully with diagnostics: {}",
-				id, status.getMessage());
-		} else {
-			// failed worker, either at startup, or running
-			final MesosWorkerStore.Worker launched = workersInLaunch.remove(id);
-			if (launched != null) {
-				LOG.info("Mesos task {} failed, with a TaskManager in launch or registration. " +
-					"State: {} Reason: {} ({})", id, status.getState(), status.getReason(), status.getMessage());
-				// we will trigger re-acquiring new workers at the end
-			} else {
-				// failed registered worker
-				LOG.info("Mesos task {} failed, with a registered TaskManager. " +
-					"State: {} Reason: {} ({})", id, status.getState(), status.getReason(), status.getMessage());
-
-				// notify the generic logic, which notifies the JobManager, etc.
-				notifyWorkerFailed(id, "Mesos task " + id + " failed.  State: " + status.getState());
-			}
-
-			// general failure logging
-			failedTasksSoFar++;
-
-			String diagMessage = String.format("Diagnostics for task %s in state %s : " +
-					"reason=%s message=%s",
-				id, status.getState(), status.getReason(), status.getMessage());
-			sendInfoMessage(diagMessage);
-
-			LOG.info(diagMessage);
-			LOG.info("Total number of failed tasks so far: {}", failedTasksSoFar);
-
-			// maxFailedTasks == -1 is infinite number of retries.
-			if (maxFailedTasks >= 0 && failedTasksSoFar > maxFailedTasks) {
-				String msg = "Stopping Mesos session because the number of failed tasks ("
-					+ failedTasksSoFar + ") exceeded the maximum failed tasks ("
-					+ maxFailedTasks + "). This number is controlled by the '"
-					+ MesosOptions.MAX_FAILED_TASKS.key() + "' configuration setting. "
-					+ "By default its the number of requested tasks.";
-
-				LOG.error(msg);
-				self().tell(decorateMessage(new StopCluster(ApplicationStatus.FAILED, msg)),
-					ActorRef.noSender());
-
-				// no need to do anything else
-				return;
-			}
-		}
-
-		// in case failed containers were among the finished containers, make
-		// sure we re-examine and request new ones
-		triggerCheckWorkers();
-	}
-
-	// ------------------------------------------------------------------------
-	//  Utilities
-	// ------------------------------------------------------------------------
-
-	private LaunchableMesosWorker createLaunchableMesosWorker(Protos.TaskID taskID) {
-		LaunchableMesosWorker launchable =
-			new LaunchableMesosWorker(
-				artifactResolver,
-				taskManagerParameters,
-				taskManagerContainerSpec,
-				taskID,
-				mesosConfig);
-
-		return launchable;
-	}
-
-	/**
-	 * Extracts a unique ResourceID from the Mesos task.
-	 *
-	 * @param taskId the Mesos TaskID
-	 * @return The ResourceID for the container
-	 */
-	static ResourceID extractResourceID(Protos.TaskID taskId) {
-		return new ResourceID(taskId.getValue());
-	}
-
-	/**
-	 * Extracts the Mesos task goal state from the worker information.
-	 * @param worker the persistent worker information.
-	 * @return goal state information for the {@Link TaskMonitor}.
-	 */
-	static TaskMonitor.TaskGoalState extractGoalState(MesosWorkerStore.Worker worker) {
-		switch (worker.state()) {
-			case New:
-				return new TaskMonitor.New(worker.taskID());
-			case Launched:
-				return new TaskMonitor.Launched(worker.taskID(), worker.slaveID().get());
-			case Released:
-				return new TaskMonitor.Released(worker.taskID(), worker.slaveID().get());
-			default:
-				throw new IllegalArgumentException("unsupported worker state");
-		}
-	}
-
-	/**
-	 * Creates the Fenzo optimizer (builder).
-	 * The builder is an indirection to facilitate unit testing of the Launch Coordinator.
-	 */
-	private static TaskSchedulerBuilder createOptimizer() {
-		return new TaskSchedulerBuilder() {
-			TaskScheduler.Builder builder = new TaskScheduler.Builder();
-
-			@Override
-			public TaskSchedulerBuilder withLeaseRejectAction(Action1<VirtualMachineLease> action) {
-				builder.withLeaseRejectAction(action);
-				return this;
-			}
-
-			@Override
-			public TaskScheduler build() {
-				return builder.build();
-			}
-		};
-	}
-
-	/**
-	 * Creates the props needed to instantiate this actor.
-	 *
-	 * <p>Rather than extracting and validating parameters in the constructor, this factory method takes
-	 * care of that. That way, errors occur synchronously, and are not swallowed simply in a
-	 * failed asynchronous attempt to start the actor.
-
-	 * @param actorClass
-	 *             The actor class, to allow overriding this actor with subclasses for testing.
-	 * @param flinkConfig
-	 *             The Flink configuration object.
-	 * @param taskManagerParameters
-	 *             The parameters for launching TaskManager containers.
-	 * @param taskManagerContainerSpec
-	 *             The container specification.
-	 * @param artifactResolver
-	 *             The artifact resolver to locate artifacts
-	 * @param log
-	 *             The logger to log to.
-	 *
-	 * @return The Props object to instantiate the MesosFlinkResourceManager actor.
-	 */
-	public static Props createActorProps(
-			Class<? extends MesosFlinkResourceManager> actorClass,
-			Configuration flinkConfig,
-			MesosConfiguration mesosConfig,
-			MesosWorkerStore workerStore,
-			LeaderRetrievalService leaderRetrievalService,
-			MesosTaskManagerParameters taskManagerParameters,
-			ContainerSpecification taskManagerContainerSpec,
-			MesosArtifactResolver artifactResolver,
-			Logger log) {
-
-		final int numInitialTaskManagers = flinkConfig.getInteger(
-			MesosOptions.INITIAL_TASKS);
-		if (numInitialTaskManagers >= 0) {
-			log.info("Mesos framework to allocate {} initial tasks",
-				numInitialTaskManagers);
-		}
-		else {
-			throw new IllegalConfigurationException("Invalid value for " +
-				MesosOptions.INITIAL_TASKS.key() + ", which must be at least zero.");
-		}
-
-		final int maxFailedTasks = flinkConfig.getInteger(
-			MesosOptions.MAX_FAILED_TASKS.key(), numInitialTaskManagers);
-		if (maxFailedTasks >= 0) {
-			log.info("Mesos framework tolerates {} failed tasks before giving up",
-				maxFailedTasks);
-		}
-
-		return Props.create(actorClass,
-			flinkConfig,
-			mesosConfig,
-			workerStore,
-			leaderRetrievalService,
-			taskManagerParameters,
-			taskManagerContainerSpec,
-			artifactResolver,
-			maxFailedTasks,
-			numInitialTaskManagers);
-	}
-}
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/RegisteredMesosWorkerNode.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/RegisteredMesosWorkerNode.java
index c65c482..e12e6b8 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/RegisteredMesosWorkerNode.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/RegisteredMesosWorkerNode.java
@@ -26,7 +26,7 @@ import org.apache.flink.util.Preconditions;
 import java.io.Serializable;
 
 /**
- * A representation of a registered Mesos task managed by the {@link MesosFlinkResourceManager}.
+ * A representation of a registered Mesos task managed by the {@link MesosResourceManager}.
  */
 public class RegisteredMesosWorkerNode implements Serializable, ResourceIDRetrievable {
 
diff --git a/flink-mesos/src/main/resources/log4j.properties b/flink-mesos/src/main/resources/log4j.properties
index 61e5afe..64d49ec 100644
--- a/flink-mesos/src/main/resources/log4j.properties
+++ b/flink-mesos/src/main/resources/log4j.properties
@@ -24,4 +24,3 @@ log4j.appender.console.layout=org.apache.log4j.PatternLayout
 log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
 
 log4j.logger.org.apache.flink.mesos=DEBUG
-log4j.logger.org.apache.flink.mesos.runtime.clusterframework.MesosFlinkResourceManager=INFO
diff --git a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManagerTest.java b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManagerTest.java
deleted file mode 100644
index 31394b2..0000000
--- a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManagerTest.java
+++ /dev/null
@@ -1,785 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.mesos.runtime.clusterframework;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.mesos.configuration.MesosOptions;
-import org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore;
-import org.apache.flink.mesos.scheduler.ConnectionMonitor;
-import org.apache.flink.mesos.scheduler.LaunchCoordinator;
-import org.apache.flink.mesos.scheduler.TaskMonitor;
-import org.apache.flink.mesos.scheduler.messages.AcceptOffers;
-import org.apache.flink.mesos.scheduler.messages.Disconnected;
-import org.apache.flink.mesos.scheduler.messages.Error;
-import org.apache.flink.mesos.scheduler.messages.OfferRescinded;
-import org.apache.flink.mesos.scheduler.messages.ReRegistered;
-import org.apache.flink.mesos.scheduler.messages.Registered;
-import org.apache.flink.mesos.scheduler.messages.ResourceOffers;
-import org.apache.flink.mesos.scheduler.messages.StatusUpdate;
-import org.apache.flink.mesos.util.MesosArtifactResolver;
-import org.apache.flink.mesos.util.MesosConfiguration;
-import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.clusterframework.ApplicationStatus;
-import org.apache.flink.runtime.clusterframework.ContainerSpecification;
-import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
-import org.apache.flink.runtime.clusterframework.messages.NotifyResourceStarted;
-import org.apache.flink.runtime.clusterframework.messages.RegisterResourceManager;
-import org.apache.flink.runtime.clusterframework.messages.RegisterResourceManagerSuccessful;
-import org.apache.flink.runtime.clusterframework.messages.RemoveResource;
-import org.apache.flink.runtime.clusterframework.messages.ResourceRemoved;
-import org.apache.flink.runtime.clusterframework.messages.SetWorkerPoolSize;
-import org.apache.flink.runtime.clusterframework.messages.StopCluster;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
-import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
-import org.apache.flink.runtime.instance.ActorGateway;
-import org.apache.flink.runtime.instance.AkkaActorGateway;
-import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
-import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService;
-import org.apache.flink.runtime.testingUtils.TestingUtils;
-import org.apache.flink.util.TestLogger;
-
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import akka.testkit.JavaTestKit;
-import akka.testkit.TestActorRef;
-import akka.testkit.TestProbe;
-import com.netflix.fenzo.ConstraintEvaluator;
-import junit.framework.AssertionFailedError;
-import org.apache.mesos.Protos;
-import org.apache.mesos.Scheduler;
-import org.apache.mesos.SchedulerDriver;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-
-import scala.Option;
-
-import static java.util.Collections.singletonList;
-import static org.apache.flink.mesos.runtime.clusterframework.MesosFlinkResourceManager.extractGoalState;
-import static org.apache.flink.mesos.runtime.clusterframework.MesosFlinkResourceManager.extractResourceID;
-import static org.hamcrest.Matchers.empty;
-import static org.hamcrest.Matchers.equalTo;
-import static org.hamcrest.Matchers.hasEntry;
-import static org.hamcrest.Matchers.hasSize;
-import static org.junit.Assert.assertThat;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyBoolean;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-/**
- * General tests for the Mesos resource manager component.
- */
-public class MesosFlinkResourceManagerTest extends TestLogger {
-
-	private static final Logger LOG = LoggerFactory.getLogger(MesosFlinkResourceManagerTest.class);
-
-	private static ActorSystem system;
-
-	private static Configuration config = new Configuration() {
-		private static final long serialVersionUID = -952579203067648838L;
-
-		{
-			setInteger(MesosOptions.MAX_FAILED_TASKS, -1);
-			setInteger(MesosOptions.INITIAL_TASKS, 0);
-	}};
-
-	@BeforeClass
-	public static void setup() {
-		system = AkkaUtils.createLocalActorSystem(config);
-	}
-
-	@AfterClass
-	public static void teardown() {
-		JavaTestKit.shutdownActorSystem(system);
-	}
-
-	/**
-	 * The RM with some test-specific behavior.
-	 */
-	static class TestingMesosFlinkResourceManager extends MesosFlinkResourceManager {
-
-		public TestProbe connectionMonitor = new TestProbe(system);
-		public TestProbe taskRouter = new TestProbe(system);
-		public TestProbe launchCoordinator = new TestProbe(system);
-		public TestProbe reconciliationCoordinator = new TestProbe(system);
-
-		public TestingMesosFlinkResourceManager(
-			Configuration flinkConfig,
-			MesosConfiguration mesosConfig,
-			MesosWorkerStore workerStore,
-			LeaderRetrievalService leaderRetrievalService,
-			MesosTaskManagerParameters taskManagerParameters,
-			ContainerSpecification taskManagerContainerSpec,
-			MesosArtifactResolver artifactResolver,
-			int maxFailedTasks,
-			int numInitialTaskManagers) {
-
-			super(flinkConfig, mesosConfig, workerStore, leaderRetrievalService, taskManagerParameters,
-				taskManagerContainerSpec, artifactResolver, maxFailedTasks, numInitialTaskManagers);
-		}
-
-		@Override
-		protected ActorRef createConnectionMonitor() {
-			return connectionMonitor.ref();
-		}
-
-		@Override
-		protected ActorRef createTaskRouter() {
-			return taskRouter.ref();
-		}
-
-		@Override
-		protected ActorRef createLaunchCoordinator() {
-			return launchCoordinator.ref();
-		}
-
-		@Override
-		protected ActorRef createReconciliationCoordinator() {
-			return reconciliationCoordinator.ref();
-		}
-
-		@Override
-		protected void fatalError(String message, Throwable error) {
-			// override the super's behavior of exiting the process
-			context().stop(self());
-		}
-	}
-
-	/**
-	 * The context fixture.
-	 */
-	static class Context extends JavaTestKit implements AutoCloseable {
-
-		// mocks
-		public ActorGateway jobManager;
-		public MesosConfiguration mesosConfig;
-		public MesosWorkerStore workerStore;
-		public MesosArtifactResolver artifactResolver;
-		public SchedulerDriver schedulerDriver;
-		public TestingMesosFlinkResourceManager resourceManagerInstance;
-		public ActorGateway resourceManager;
-
-		// domain objects for test purposes
-		Protos.FrameworkID framework1 = Protos.FrameworkID.newBuilder().setValue("framework1").build();
-		public Protos.SlaveID slave1 = Protos.SlaveID.newBuilder().setValue("slave1").build();
-		public String slave1host = "localhost";
-		public Protos.OfferID offer1 = Protos.OfferID.newBuilder().setValue("offer1").build();
-		public Protos.TaskID task1 = Protos.TaskID.newBuilder().setValue("taskmanager-00001").build();
-		public Protos.TaskID task2 = Protos.TaskID.newBuilder().setValue("taskmanager-00002").build();
-		public Protos.TaskID task3 = Protos.TaskID.newBuilder().setValue("taskmanager-00003").build();
-
-		private final TestingHighAvailabilityServices highAvailabilityServices;
-
-		/**
-		 * Create mock RM dependencies.
-		 */
-		public Context() {
-			super(system);
-
-			try {
-				jobManager = TestingUtils.createForwardingActor(
-					system,
-					getTestActor(),
-					HighAvailabilityServices.DEFAULT_LEADER_ID,
-					Option.<String>empty());
-
-				highAvailabilityServices = new TestingHighAvailabilityServices();
-
-				highAvailabilityServices.setJobMasterLeaderRetriever(
-					HighAvailabilityServices.DEFAULT_JOB_ID,
-					new SettableLeaderRetrievalService(
-						jobManager.path(),
-						HighAvailabilityServices.DEFAULT_LEADER_ID));
-
-				// scheduler driver
-				schedulerDriver = mock(SchedulerDriver.class);
-
-				// config
-				mesosConfig = mock(MesosConfiguration.class);
-				when(mesosConfig.frameworkInfo()).thenReturn(Protos.FrameworkInfo.newBuilder());
-				when(mesosConfig.withFrameworkInfo(any(Protos.FrameworkInfo.Builder.class))).thenReturn(mesosConfig);
-				when(mesosConfig.createDriver(any(Scheduler.class), anyBoolean())).thenReturn(schedulerDriver);
-
-				// worker store
-				workerStore = mock(MesosWorkerStore.class);
-				when(workerStore.getFrameworkID()).thenReturn(Option.<Protos.FrameworkID>empty());
-
-				// artifact
-				artifactResolver = mock(MesosArtifactResolver.class);
-			} catch (Exception ex) {
-				throw new RuntimeException(ex);
-			}
-		}
-
-		/**
-		 * Initialize the resource manager.
-		 */
-		public void initialize() {
-			ContainerSpecification containerSpecification = new ContainerSpecification();
-			ContaineredTaskManagerParameters containeredParams =
-				new ContaineredTaskManagerParameters(1024, 768, 256, 4, new HashMap<String, String>());
-			MesosTaskManagerParameters tmParams = new MesosTaskManagerParameters(
-				1.0, 1,
-				MesosTaskManagerParameters.ContainerType.MESOS,
-				Option.<String>empty(),
-				containeredParams,
-				Collections.<Protos.Volume>emptyList(),
-				Collections.<Protos.Parameter>emptyList(),
-				false,
-				Collections.<ConstraintEvaluator>emptyList(),
-				"",
-				Option.<String>empty(),
-				Option.<String>empty(),
-				Collections.<String>emptyList());
-
-			TestActorRef<TestingMesosFlinkResourceManager> resourceManagerRef =
-				TestActorRef.create(system, MesosFlinkResourceManager.createActorProps(
-					TestingMesosFlinkResourceManager.class,
-					config,
-					mesosConfig,
-					workerStore,
-					highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID),
-					tmParams,
-					containerSpecification,
-					artifactResolver,
-					LOG));
-			resourceManagerInstance = resourceManagerRef.underlyingActor();
-			resourceManager = new AkkaActorGateway(resourceManagerRef, HighAvailabilityServices.DEFAULT_LEADER_ID);
-
-			verify(schedulerDriver).start();
-			resourceManagerInstance.connectionMonitor.expectMsgClass(ConnectionMonitor.Start.class);
-		}
-
-		/**
-		 * Send a RegisterResourceManagerSuccessful message to the RM.
-		 * @param currentlyRegisteredTaskManagers the already-registered workers.
-         */
-		public void register(Collection<ResourceID> currentlyRegisteredTaskManagers) {
-			// register with JM
-			expectMsgClass(RegisterResourceManager.class);
-			resourceManager.tell(
-				new RegisterResourceManagerSuccessful(jobManager.actor(), currentlyRegisteredTaskManagers),
-				jobManager);
-		}
-
-		/**
-		 * Prepares a launch operation.
-         */
-		public Protos.Offer.Operation launch(Protos.TaskInfo... taskInfo) {
-			return Protos.Offer.Operation.newBuilder()
-				.setType(Protos.Offer.Operation.Type.LAUNCH)
-				.setLaunch(Protos.Offer.Operation.Launch.newBuilder().addAllTaskInfos(Arrays.asList(taskInfo))
-				).build();
-		}
-
-		@Override
-		public void close() throws Exception {
-			highAvailabilityServices.closeAndCleanupAllData();
-		}
-	}
-
-	/**
-	 * Test recovery of persistent workers.
-	 */
-	@Test
-	public void testRecoverWorkers() {
-		new Context() {{
-			new Within(duration("10 seconds")) {
-				@Override
-				protected void run() {
-					try {
-						// set the initial persistent state then initialize the RM
-						MesosWorkerStore.Worker worker1 = MesosWorkerStore.Worker.newWorker(task1);
-						MesosWorkerStore.Worker worker2 = MesosWorkerStore.Worker.newWorker(task2).launchWorker(slave1, slave1host);
-						MesosWorkerStore.Worker worker3 = MesosWorkerStore.Worker.newWorker(task3).launchWorker(slave1, slave1host).releaseWorker();
-						when(workerStore.getFrameworkID()).thenReturn(Option.apply(framework1));
-						when(workerStore.recoverWorkers()).thenReturn(Arrays.asList(worker1, worker2, worker3));
-						initialize();
-
-						// verify that the internal state was updated, the task router was notified,
-						// and the launch coordinator was asked to launch a task
-						assertThat(resourceManagerInstance.workersInNew, hasEntry(extractResourceID(task1), worker1));
-						assertThat(resourceManagerInstance.workersInLaunch, hasEntry(extractResourceID(task2), worker2));
-						assertThat(resourceManagerInstance.workersBeingReturned, hasEntry(extractResourceID(task3), worker3));
-						resourceManagerInstance.taskRouter.expectMsgClass(TaskMonitor.TaskGoalStateUpdated.class);
-						LaunchCoordinator.Assign actualAssign =
-							resourceManagerInstance.launchCoordinator.expectMsgClass(LaunchCoordinator.Assign.class);
-						assertThat(actualAssign.tasks(), hasSize(1));
-						assertThat(actualAssign.tasks().get(0).f0.getId(), equalTo(task2.getValue()));
-						assertThat(actualAssign.tasks().get(0).f1, equalTo(slave1host));
-						resourceManagerInstance.launchCoordinator.expectMsgClass(LaunchCoordinator.Launch.class);
-
-						register(Collections.<ResourceID>emptyList());
-					} catch (Exception ex) {
-						throw new RuntimeException(ex);
-					}
-				}
-			};
-		}};
-	}
-
-	/**
-	 * Test re-acceptance of registered workers upon JM registration.
-	 */
-	@Test
-	public void testReacceptRegisteredWorkers() {
-		new Context() {{
-			new Within(duration("10 seconds")) {
-				@Override
-				protected void run() {
-					try {
-						// set the initial persistent state then initialize the RM
-						MesosWorkerStore.Worker worker1launched = MesosWorkerStore.Worker.newWorker(task1).launchWorker(slave1, slave1host);
-						when(workerStore.getFrameworkID()).thenReturn(Option.apply(framework1));
-						when(workerStore.recoverWorkers()).thenReturn(singletonList(worker1launched));
-						initialize();
-
-						// send RegisterResourceManagerSuccessful to the RM with some 'known' workers.
-						// This will cause the RM to reaccept the workers.
-						assertThat(resourceManagerInstance.workersInLaunch, hasEntry(extractResourceID(task1), worker1launched));
-						register(singletonList(extractResourceID(task1)));
-						assertThat(resourceManagerInstance.workersInLaunch.entrySet(), empty());
-					} catch (Exception ex) {
-						throw new RuntimeException(ex);
-					}
-				}
-			};
-		}};
-	}
-
-	/**
-	 * Test normal worker registration.
-	 */
-	@Test
-	public void testWorkerRegistered() {
-		new Context() {{
-			new Within(duration("10 seconds")) {
-				@Override
-				protected void run() {
-					try {
-						// set the initial state with a (recovered) launched worker
-						MesosWorkerStore.Worker worker1launched = MesosWorkerStore.Worker.newWorker(task1).launchWorker(slave1, slave1host);
-						when(workerStore.getFrameworkID()).thenReturn(Option.apply(framework1));
-						when(workerStore.recoverWorkers()).thenReturn(singletonList(worker1launched));
-						initialize();
-						assertThat(resourceManagerInstance.workersInLaunch, hasEntry(extractResourceID(task1), worker1launched));
-						register(Collections.<ResourceID>emptyList());
-
-						// send registration message
-						NotifyResourceStarted msg = new NotifyResourceStarted(extractResourceID(task1));
-						resourceManager.tell(msg);
-
-						// verify that the internal state was updated
-						assertThat(resourceManagerInstance.workersInLaunch.entrySet(), empty());
-					} catch (Exception ex) {
-						throw new RuntimeException(ex);
-					}
-				}
-			};
-		}};
-	}
-
-	/**
-	 * Test release of registered workers.
-	 */
-	@Test
-	public void testReleaseRegisteredWorker() {
-		new Context() {{
-			new Within(duration("10 seconds")) {
-				@Override
-				protected void run() {
-					try {
-						// set the initial persistent state, initialize the RM, then register with task1 as a registered worker
-						MesosWorkerStore.Worker worker1 = MesosWorkerStore.Worker.newWorker(task1).launchWorker(slave1, slave1host);
-						when(workerStore.getFrameworkID()).thenReturn(Option.apply(framework1));
-						when(workerStore.recoverWorkers()).thenReturn(singletonList(worker1));
-						initialize();
-						resourceManagerInstance.launchCoordinator.expectMsgClass(LaunchCoordinator.Assign.class);
-						register(singletonList(extractResourceID(task1)));
-
-						// release the registered worker
-						resourceManager.tell(new RemoveResource(extractResourceID(task1)));
-
-						// verify that the worker was persisted, the internal state was updated, the task router was notified,
-						// and the launch coordinator was notified about the host assignment change
-						MesosWorkerStore.Worker worker2Released = worker1.releaseWorker();
-						verify(workerStore).putWorker(worker2Released);
-						assertThat(resourceManagerInstance.workersBeingReturned, hasEntry(extractResourceID(task1), worker2Released));
-						resourceManagerInstance.launchCoordinator.expectMsg(new LaunchCoordinator.Unassign(task1, slave1host));
-
-						// send the subsequent terminated message
-						when(workerStore.removeWorker(task1)).thenReturn(true);
-						resourceManager.tell(new TaskMonitor.TaskTerminated(task1, Protos.TaskStatus.newBuilder()
-							.setTaskId(task1).setSlaveId(slave1).setState(Protos.TaskState.TASK_FINISHED).build()));
-
-						// verify that the instance state was updated
-						assertThat(resourceManagerInstance.workersBeingReturned.entrySet(), empty());
-						verify(workerStore).removeWorker(task1);
-					} catch (Exception ex) {
-						throw new RuntimeException(ex);
-					}
-				}
-			};
-		}};
-	}
-
-	/**
-	 * Test request for new workers.
-	 */
-	@Test
-	public void testRequestNewWorkers() {
-		new Context() {{
-			new Within(duration("10 seconds")) {
-				@Override
-				protected void run() {
-					try {
-						initialize();
-						register(Collections.<ResourceID>emptyList());
-
-						// set the target pool size
-						when(workerStore.newTaskID()).thenReturn(task1).thenThrow(new AssertionFailedError());
-						resourceManager.tell(new SetWorkerPoolSize(1), jobManager);
-
-						// verify that a new worker was persisted, the internal state was updated, the task router was notified,
-						// and the launch coordinator was asked to launch a task
-						MesosWorkerStore.Worker expected = MesosWorkerStore.Worker.newWorker(task1);
-						verify(workerStore).putWorker(expected);
-						assertThat(resourceManagerInstance.workersInNew, hasEntry(extractResourceID(task1), expected));
-						resourceManagerInstance.taskRouter.expectMsgClass(TaskMonitor.TaskGoalStateUpdated.class);
-						resourceManagerInstance.launchCoordinator.expectMsgClass(LaunchCoordinator.Launch.class);
-					} catch (Exception ex) {
-						throw new RuntimeException(ex);
-					}
-				}
-			};
-		}};
-	}
-
-	/**
-	 * Test offer handling.
-	 */
-	@Test
-	public void testOfferHandling() {
-		new Context() {{
-			new Within(duration("10 seconds")) {
-				@Override
-				protected void run() {
-					initialize();
-					register(Collections.<ResourceID>emptyList());
-
-					// Verify that the RM forwards offers to the launch coordinator.
-					resourceManager.tell(new ResourceOffers(Collections.<Protos.Offer>emptyList()));
-					resourceManagerInstance.launchCoordinator.expectMsgClass(ResourceOffers.class);
-					resourceManager.tell(new OfferRescinded(offer1));
-					resourceManagerInstance.launchCoordinator.expectMsgClass(OfferRescinded.class);
-				}
-			};
-		}};
-	}
-
-	/**
-	 * Test offer acceptance.
-	 */
-	@Test
-	public void testAcceptOffers() {
-		new Context() {{
-			new Within(duration("10 seconds")) {
-				@Override
-				protected void run() {
-					try {
-						// set the initial persistent state with a new task then initialize the RM
-						MesosWorkerStore.Worker worker1 = MesosWorkerStore.Worker.newWorker(task1);
-						when(workerStore.getFrameworkID()).thenReturn(Option.apply(framework1));
-						when(workerStore.recoverWorkers()).thenReturn(singletonList(worker1));
-						initialize();
-						assertThat(resourceManagerInstance.workersInNew, hasEntry(extractResourceID(task1), worker1));
-						resourceManagerInstance.taskRouter.expectMsgClass(TaskMonitor.TaskGoalStateUpdated.class);
-						register(Collections.<ResourceID>emptyList());
-
-						// send an AcceptOffers message as the LaunchCoordinator would
-						// to launch task1 onto slave1 with offer1
-						Protos.TaskInfo task1info = Protos.TaskInfo.newBuilder()
-							.setTaskId(task1).setName("").setSlaveId(slave1).build();
-						AcceptOffers msg = new AcceptOffers(slave1host, singletonList(offer1), singletonList(launch(task1info)));
-						resourceManager.tell(msg);
-
-						// verify that the worker was persisted, the internal state was updated,
-						// Mesos was asked to launch task1, and the task router was notified
-						MesosWorkerStore.Worker worker1launched = worker1.launchWorker(slave1, slave1host);
-						verify(workerStore).putWorker(worker1launched);
-						assertThat(resourceManagerInstance.workersInNew.entrySet(), empty());
-						assertThat(resourceManagerInstance.workersInLaunch, hasEntry(extractResourceID(task1), worker1launched));
-						resourceManagerInstance.taskRouter.expectMsg(
-							new TaskMonitor.TaskGoalStateUpdated(extractGoalState(worker1launched)));
-						verify(schedulerDriver).acceptOffers(msg.offerIds(), msg.operations(), msg.filters());
-					} catch (Exception ex) {
-						throw new RuntimeException(ex);
-					}
-				}
-			};
-		}};
-	}
-
-	/**
-	 * Test status handling.
-	 */
-	@Test
-	public void testStatusHandling() {
-		new Context() {{
-			new Within(duration("10 seconds")) {
-				@Override
-				protected void run() {
-					initialize();
-					register(Collections.<ResourceID>emptyList());
-
-					// Verify that the RM forwards status updates to the launch coordinator and task router.
-					resourceManager.tell(new StatusUpdate(Protos.TaskStatus.newBuilder()
-						.setTaskId(task1).setSlaveId(slave1).setState(Protos.TaskState.TASK_LOST).build()),
-						resourceManager);
-					resourceManagerInstance.reconciliationCoordinator.expectMsgClass(StatusUpdate.class);
-					resourceManagerInstance.taskRouter.expectMsgClass(StatusUpdate.class);
-				}
-			};
-		}};
-	}
-
-	/**
-	 * Test unplanned task failure of a pending worker.
-	 */
-	@Test
-	public void testPendingWorkerFailed() {
-		new Context() {{
-			new Within(duration("10 seconds")) {
-				@Override
-				protected void run() {
-					try {
-						// set the initial persistent state with a launched worker that hasn't yet registered
-						MesosWorkerStore.Worker worker1launched = MesosWorkerStore.Worker.newWorker(task1).launchWorker(slave1, slave1host);
-						when(workerStore.getFrameworkID()).thenReturn(Option.apply(framework1));
-						when(workerStore.recoverWorkers()).thenReturn(singletonList(worker1launched));
-						initialize();
-						register(Collections.<ResourceID>emptyList());
-
-						// tell the RM that a task failed (and prepare a replacement task)
-						when(workerStore.newTaskID()).thenReturn(task2);
-						when(workerStore.removeWorker(task1)).thenReturn(true);
-						resourceManager.tell(new SetWorkerPoolSize(1), jobManager);
-						resourceManager.tell(new TaskMonitor.TaskTerminated(task1, Protos.TaskStatus.newBuilder()
-							.setTaskId(task1).setSlaveId(slave1).setState(Protos.TaskState.TASK_FAILED).build()));
-
-						// verify that the instance state was updated
-						assertThat(resourceManagerInstance.workersInLaunch.entrySet(), empty());
-						verify(workerStore).newTaskID();
-					} catch (Exception ex) {
-						throw new RuntimeException(ex);
-					}
-				}
-			};
-		}};
-	}
-
-	/**
-	 * Test unplanned task failure of a registered worker.
-	 */
-	@Test
-	public void testRegisteredWorkerFailed() {
-		new Context() {{
-			new Within(duration("10 seconds")) {
-				@Override
-				protected void run() {
-					try {
-						// set the initial persistent state with a launched & registered worker
-						MesosWorkerStore.Worker worker1launched = MesosWorkerStore.Worker.newWorker(task1).launchWorker(slave1, slave1host);
-						when(workerStore.getFrameworkID()).thenReturn(Option.apply(framework1));
-						when(workerStore.recoverWorkers()).thenReturn(singletonList(worker1launched));
-						initialize();
-						register(singletonList(extractResourceID(task1)));
-
-						// tell the RM that a task failed (and prepare a replacement task)
-						when(workerStore.newTaskID()).thenReturn(task2);
-						when(workerStore.removeWorker(task1)).thenReturn(true);
-						resourceManager.tell(new SetWorkerPoolSize(1), jobManager);
-						resourceManager.tell(new TaskMonitor.TaskTerminated(task1, Protos.TaskStatus.newBuilder()
-							.setTaskId(task1).setSlaveId(slave1).setState(Protos.TaskState.TASK_FAILED).build()));
-
-						// verify that the instance state was updated and a replacement was created
-						assertThat(resourceManagerInstance.workersInLaunch.entrySet(), empty());
-						expectMsgClass(ResourceRemoved.class);
-						verify(workerStore).newTaskID();
-					} catch (Exception ex) {
-						throw new RuntimeException(ex);
-					}
-				}
-			};
-		}};
-	}
-
-	/**
-	 * Test cluster stop handling.
-	 */
-	@Test
-	public void testStopApplication() {
-		new Context() {{
-			new Within(duration("10 seconds")) {
-				@Override
-				protected void run() {
-					try {
-						initialize();
-						register(Collections.<ResourceID>emptyList());
-						watch(resourceManager.actor());
-						resourceManager.tell(new StopCluster(ApplicationStatus.SUCCEEDED, ""), resourceManager);
-
-						// verify that the Mesos framework is shutdown
-						verify(schedulerDriver).stop(false);
-						verify(workerStore).stop(true);
-						expectTerminated(resourceManager.actor());
-					} catch (Exception ex) {
-						throw new RuntimeException(ex);
-					}
-				}
-			};
-		}};
-	}
-
-	// ------------- connectivity tests -----------------------------
-
-	/**
-	 * Test Mesos registration handling.
-	 */
-	@Test
-	public void testRegistered() {
-		new Context() {{
-			new Within(duration("10 seconds")) {
-				@Override
-				protected void run() {
-					try {
-						initialize();
-						register(Collections.<ResourceID>emptyList());
-
-						Protos.MasterInfo masterInfo = Protos.MasterInfo.newBuilder()
-							.setId("master1").setIp(0).setPort(5050).build();
-						resourceManager.tell(new Registered(framework1, masterInfo), resourceManager);
-
-						verify(workerStore).setFrameworkID(Option.apply(framework1));
-						resourceManagerInstance.connectionMonitor.expectMsgClass(Registered.class);
-						resourceManagerInstance.reconciliationCoordinator.expectMsgClass(Registered.class);
-						resourceManagerInstance.launchCoordinator.expectMsgClass(Registered.class);
-						resourceManagerInstance.taskRouter.expectMsgClass(Registered.class);
-					} catch (Exception ex) {
-						throw new RuntimeException(ex);
-					}
-				}
-			};
-		}};
-	}
-
-
-	/**
-	 * Test Mesos re-registration handling.
-	 */
-	@Test
-	public void testReRegistered() {
-		new Context() {{
-			new Within(duration("10 seconds")) {
-				@Override
-				protected void run() {
-					try {
-						when(workerStore.getFrameworkID()).thenReturn(Option.apply(framework1));
-						initialize();
-						register(Collections.<ResourceID>emptyList());
-
-						Protos.MasterInfo masterInfo = Protos.MasterInfo.newBuilder()
-							.setId("master1").setIp(0).setPort(5050).build();
-						resourceManager.tell(new ReRegistered(masterInfo), resourceManager);
-
-						resourceManagerInstance.connectionMonitor.expectMsgClass(ReRegistered.class);
-						resourceManagerInstance.reconciliationCoordinator.expectMsgClass(ReRegistered.class);
-						resourceManagerInstance.launchCoordinator.expectMsgClass(ReRegistered.class);
-						resourceManagerInstance.taskRouter.expectMsgClass(ReRegistered.class);
-					} catch (Exception ex) {
-						throw new RuntimeException(ex);
-					}
-				}
-			};
-		}};
-	}
-
-	/**
-	 * Test Mesos re-registration handling.
-	 */
-	@Test
-	public void testDisconnected() {
-		new Context() {{
-			new Within(duration("10 seconds")) {
-				@Override
-				protected void run() {
-					try {
-						when(workerStore.getFrameworkID()).thenReturn(Option.apply(framework1));
-						initialize();
-						register(Collections.<ResourceID>emptyList());
-
-						resourceManager.tell(new Disconnected(), resourceManager);
-
-						resourceManagerInstance.connectionMonitor.expectMsgClass(Disconnected.class);
-						resourceManagerInstance.reconciliationCoordinator.expectMsgClass(Disconnected.class);
-						resourceManagerInstance.launchCoordinator.expectMsgClass(Disconnected.class);
-						resourceManagerInstance.taskRouter.expectMsgClass(Disconnected.class);
-					} catch (Exception ex) {
-						throw new RuntimeException(ex);
-					}
-				}
-			};
-		}};
-	}
-
-	/**
-	 * Test Mesos scheduler error.
-	 */
-	@Test
-	public void testError() {
-		new Context() {{
-			new Within(duration("10 seconds")) {
-				@Override
-				protected void run() {
-					try {
-						when(workerStore.getFrameworkID()).thenReturn(Option.apply(framework1));
-						initialize();
-						register(Collections.<ResourceID>emptyList());
-
-						watch(resourceManager.actor());
-						resourceManager.tell(new Error("test"), resourceManager);
-						expectTerminated(resourceManager.actor());
-					} catch (Exception ex) {
-						throw new RuntimeException(ex);
-					}
-				}
-			};
-		}};
-	}
-}
diff --git a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java
index 5c9aa46..c5d053c 100644
--- a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java
+++ b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java
@@ -106,8 +106,8 @@ import java.util.concurrent.TimeUnit;
 import scala.Option;
 
 import static java.util.Collections.singletonList;
-import static org.apache.flink.mesos.runtime.clusterframework.MesosFlinkResourceManager.extractGoalState;
-import static org.apache.flink.mesos.runtime.clusterframework.MesosFlinkResourceManager.extractResourceID;
+import static org.apache.flink.mesos.runtime.clusterframework.MesosResourceManager.extractGoalState;
+import static org.apache.flink.mesos.runtime.clusterframework.MesosResourceManager.extractResourceID;
 import static org.hamcrest.Matchers.empty;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.hasEntry;