You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/06/29 06:41:57 UTC

[1/4] [flink] Git Push Summary

Repository: flink
Updated Branches:
  refs/heads/master b59148cf7 -> 901a9caf4

[4/4] flink git commit: [FLINK-6379] Remove internal methods from MesosResourceManagerGateway

Posted by tr...@apache.org.
[FLINK-6379] Remove internal methods from MesosResourceManagerGateway

Some internal methods which are required for the interplay between the TaskMonitor,
LaunchCoordinator and the MesosResourceManager were exposed as RPC methods. In order
to keep the RPC interface as lean as possible, these methods have been removed.

Fix checkstyle violations


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/901a9caf
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/901a9caf
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/901a9caf

Branch: refs/heads/master
Commit: 901a9caf4cf49b6cd2f18f0d6f04045312e96d84
Parents: 4bb488c
Author: Till Rohrmann <tr...@apache.org>
Authored: Tue Jun 27 15:04:17 2017 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Thu Jun 29 08:41:34 2017 +0200

----------------------------------------------------------------------
 .../clusterframework/MesosResourceManager.java  | 328 ++++++++++++-------
 .../MesosResourceManagerActions.java            |  60 ++++
 .../MesosResourceManagerGateway.java            |  59 ----
 .../store/MesosWorkerStore.java                 |   1 +
 .../flink/mesos/scheduler/SchedulerGateway.java |  89 -----
 .../flink/mesos/scheduler/SchedulerProxyV2.java | 103 ------
 .../MesosResourceManagerTest.java               | 118 ++++---
 .../resourcemanager/ResourceManager.java        |  16 +-
 .../resourcemanager/ResourceManagerRunner.java  |   2 +-
 .../StandaloneResourceManager.java              |   2 +-
 .../StandaloneResourceManagerGateway.java       |  25 --
 .../taskexecutor/TaskExecutorITCase.java        |   3 +-
 .../yarn/YarnFlinkApplicationMasterRunner.java  |   2 +-
 .../apache/flink/yarn/YarnResourceManager.java  |   2 +-
 .../flink/yarn/YarnResourceManagerGateway.java  |  27 --
 15 files changed, 341 insertions(+), 496 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/901a9caf/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
index bb20060..43b58bc 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
@@ -18,15 +18,6 @@
 
 package org.apache.flink.mesos.runtime.clusterframework;
 
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import akka.actor.Props;
-import akka.actor.UntypedActor;
-import com.netflix.fenzo.TaskRequest;
-import com.netflix.fenzo.TaskScheduler;
-import com.netflix.fenzo.VirtualMachineLease;
-import com.netflix.fenzo.functions.Action1;
-import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore;
@@ -34,13 +25,11 @@ import org.apache.flink.mesos.scheduler.ConnectionMonitor;
 import org.apache.flink.mesos.scheduler.LaunchCoordinator;
 import org.apache.flink.mesos.scheduler.LaunchableTask;
 import org.apache.flink.mesos.scheduler.ReconciliationCoordinator;
-import org.apache.flink.mesos.scheduler.SchedulerProxyV2;
 import org.apache.flink.mesos.scheduler.TaskMonitor;
 import org.apache.flink.mesos.scheduler.TaskSchedulerBuilder;
 import org.apache.flink.mesos.scheduler.Tasks;
 import org.apache.flink.mesos.scheduler.messages.AcceptOffers;
 import org.apache.flink.mesos.scheduler.messages.Disconnected;
-import org.apache.flink.mesos.scheduler.messages.Error;
 import org.apache.flink.mesos.scheduler.messages.ExecutorLost;
 import org.apache.flink.mesos.scheduler.messages.FrameworkMessage;
 import org.apache.flink.mesos.scheduler.messages.OfferRescinded;
@@ -66,14 +55,23 @@ import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
 import org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException;
 import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
-import org.apache.flink.runtime.rpc.RpcMethod;
 import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.Preconditions;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.Props;
+import akka.actor.UntypedActor;
+import com.netflix.fenzo.TaskRequest;
+import com.netflix.fenzo.TaskScheduler;
+import com.netflix.fenzo.VirtualMachineLease;
+import com.netflix.fenzo.functions.Action1;
 import org.apache.mesos.Protos;
+import org.apache.mesos.Scheduler;
 import org.apache.mesos.SchedulerDriver;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import scala.Option;
 
 import java.util.ArrayList;
 import java.util.Collections;
@@ -81,42 +79,40 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import scala.Option;
+
 /**
  * The Mesos implementation of the resource manager.
  */
-public class MesosResourceManager extends ResourceManager<MesosResourceManagerGateway, RegisteredMesosWorkerNode> {
+public class MesosResourceManager extends ResourceManager<RegisteredMesosWorkerNode> {
 	protected static final Logger LOG = LoggerFactory.getLogger(MesosResourceManager.class);
 
-	/** The Flink configuration */
+	/** The Flink configuration. */
 	private final Configuration flinkConfig;
 
-	/** The Mesos configuration (master and framework info) */
+	/** The Mesos configuration (master and framework info). */
 	private final MesosConfiguration mesosConfig;
 
-	/** The TaskManager container parameters (like container memory size) */
+	/** The TaskManager container parameters (like container memory size). */
 	private final MesosTaskManagerParameters taskManagerParameters;
 
-	/** Container specification for launching a TM */
+	/** Container specification for launching a TM. */
 	private final ContainerSpecification taskManagerContainerSpec;
 
-	/** Resolver for HTTP artifacts */
+	/** Resolver for HTTP artifacts. */
 	private final MesosArtifactResolver artifactResolver;
 
-	/** Persistent storage of allocated containers */
+	/** Persistent storage of allocated containers. */
 	private final MesosWorkerStore workerStore;
 
-	/** A local actor system for using the helper actors */
+	/** A local actor system for using the helper actors. */
 	private final ActorSystem actorSystem;
 
-	/** Callback handler for the asynchronous Mesos scheduler */
-	private SchedulerProxyV2 schedulerCallbackHandler;
-
-	/** Mesos scheduler driver */
+	/** Mesos scheduler driver. */
 	private SchedulerDriver schedulerDriver;
 
-	/** an adapter to receive messages from Akka actors */
-	@VisibleForTesting
-	ActorRef selfActor;
+	/** an adapter to receive messages from Akka actors. */
+	private ActorRef selfActor;
 
 	private ActorRef connectionMonitor;
 
@@ -126,7 +122,7 @@ public class MesosResourceManager extends ResourceManager<MesosResourceManagerGa
 
 	private ActorRef reconciliationCoordinator;
 
-	/** planning state related to workers - package private for unit test purposes */
+	/** planning state related to workers - package private for unit test purposes. */
 	final Map<ResourceID, MesosWorkerStore.Worker> workersInNew;
 	final Map<ResourceID, MesosWorkerStore.Worker> workersInLaunch;
 	final Map<ResourceID, MesosWorkerStore.Worker> workersBeingReturned;
@@ -181,7 +177,8 @@ public class MesosResourceManager extends ResourceManager<MesosResourceManagerGa
 
 	protected ActorRef createSelfActor() {
 		return actorSystem.actorOf(
-			AkkaAdapter.createActorProps(getSelf()),"ResourceManager");
+			Props.create(AkkaAdapter.class, this),
+			"ResourceManager");
 	}
 
 	protected ActorRef createConnectionMonitor() {
@@ -190,19 +187,21 @@ public class MesosResourceManager extends ResourceManager<MesosResourceManagerGa
 			"connectionMonitor");
 	}
 
-	protected ActorRef createTaskRouter() {
+	protected ActorRef createTaskMonitor(SchedulerDriver schedulerDriver) {
 		return actorSystem.actorOf(
 			Tasks.createActorProps(Tasks.class, flinkConfig, schedulerDriver, TaskMonitor.class),
 			"tasks");
 	}
 
-	protected ActorRef createLaunchCoordinator() {
+	protected ActorRef createLaunchCoordinator(
+			SchedulerDriver schedulerDriver,
+			ActorRef selfActor) {
 		return actorSystem.actorOf(
 			LaunchCoordinator.createActorProps(LaunchCoordinator.class, selfActor, flinkConfig, schedulerDriver, createOptimizer()),
 			"launchCoordinator");
 	}
 
-	protected ActorRef createReconciliationCoordinator() {
+	protected ActorRef createReconciliationCoordinator(SchedulerDriver schedulerDriver) {
 		return actorSystem.actorOf(
 			ReconciliationCoordinator.createActorProps(ReconciliationCoordinator.class, flinkConfig, schedulerDriver),
 			"reconciliationCoordinator");
@@ -220,14 +219,10 @@ public class MesosResourceManager extends ResourceManager<MesosResourceManagerGa
 		// start the worker store
 		try {
 			workerStore.start();
-		}
-		catch(Exception e) {
+		} catch (Exception e) {
 			throw new ResourceManagerException("Unable to initialize the worker store.", e);
 		}
 
-		// create the scheduler driver to communicate with Mesos
-		schedulerCallbackHandler = new SchedulerProxyV2(getSelf());
-
 		// register with Mesos
 		// TODO : defer connection until RM acquires leadership
 
@@ -242,27 +237,27 @@ public class MesosResourceManager extends ResourceManager<MesosResourceManagerGa
 				LOG.info("Recovery scenario: re-registering using framework ID {}.", frameworkID.get().getValue());
 				frameworkInfo.setId(frameworkID.get());
 			}
-		}
-		catch(Exception e) {
+		} catch (Exception e) {
 			throw new ResourceManagerException("Unable to recover the framework ID.", e);
 		}
 
 		MesosConfiguration initializedMesosConfig = mesosConfig.withFrameworkInfo(frameworkInfo);
 		MesosConfiguration.logMesosConfig(LOG, initializedMesosConfig);
-		schedulerDriver = initializedMesosConfig.createDriver(schedulerCallbackHandler, false);
+		schedulerDriver = initializedMesosConfig.createDriver(
+			new MesosResourceManagerSchedulerCallback(),
+			false);
 
 		// create supporting actors
 		selfActor = createSelfActor();
 		connectionMonitor = createConnectionMonitor();
-		launchCoordinator = createLaunchCoordinator();
-		reconciliationCoordinator = createReconciliationCoordinator();
-		taskMonitor = createTaskRouter();
+		launchCoordinator = createLaunchCoordinator(schedulerDriver, selfActor);
+		reconciliationCoordinator = createReconciliationCoordinator(schedulerDriver);
+		taskMonitor = createTaskMonitor(schedulerDriver);
 
 		// recover state
 		try {
 			recoverWorkers();
-		}
-		catch(Exception e) {
+		} catch (Exception e) {
 			throw new ResourceManagerException("Unable to recover Mesos worker state.", e);
 		}
 
@@ -288,7 +283,7 @@ public class MesosResourceManager extends ResourceManager<MesosResourceManagerGa
 		if (!tasksFromPreviousAttempts.isEmpty()) {
 			LOG.info("Retrieved {} TaskManagers from previous attempt", tasksFromPreviousAttempts.size());
 
-			List<Tuple2<TaskRequest,String>> toAssign = new ArrayList<>(tasksFromPreviousAttempts.size());
+			List<Tuple2<TaskRequest, String>> toAssign = new ArrayList<>(tasksFromPreviousAttempts.size());
 
 			for (final MesosWorkerStore.Worker worker : tasksFromPreviousAttempts) {
 				LaunchableMesosWorker launchable = createLaunchableMesosWorker(worker.taskID(), worker.profile());
@@ -310,31 +305,38 @@ public class MesosResourceManager extends ResourceManager<MesosResourceManagerGa
 			}
 
 			// tell the launch coordinator about prior assignments
-			if(toAssign.size() >= 1) {
+			if (toAssign.size() >= 1) {
 				launchCoordinator.tell(new LaunchCoordinator.Assign(toAssign), selfActor);
 			}
 		}
 	}
 
 	@Override
-	protected void shutDownApplication(ApplicationStatus finalStatus, String optionalDiagnostics) {
+	protected void shutDownApplication(
+			ApplicationStatus finalStatus,
+			String optionalDiagnostics) throws ResourceManagerException {
 		LOG.info("Shutting down and unregistering as a Mesos framework.");
+
+		Exception exception = null;
+
 		try {
 			// unregister the framework, which implicitly removes all tasks.
 			schedulerDriver.stop(false);
-		}
-		catch(Exception ex) {
-			LOG.warn("unable to unregister the framework", ex);
+		} catch (Exception ex) {
+			exception = new Exception("Could not unregister the Mesos framework.", ex);
 		}
 
 		try {
 			workerStore.stop(true);
-		}
-		catch(Exception ex) {
-			LOG.warn("unable to stop the worker state store", ex);
+		} catch (Exception ex) {
+			exception = ExceptionUtils.firstOrSuppressed(
+				new Exception("Could not stop the Mesos worker store.", ex),
+				exception);
 		}
 
-		LOG.info("Shutdown completed.");
+		if (exception != null) {
+			throw new ResourceManagerException("Could not properly shut down the Mesos application.", exception);
+		}
 	}
 
 	@Override
@@ -356,8 +358,7 @@ public class MesosResourceManager extends ResourceManager<MesosResourceManagerGa
 
 			// tell the launch coordinator to launch the new tasks
 			launchCoordinator.tell(new LaunchCoordinator.Launch(Collections.singletonList((LaunchableTask) launchable)), selfActor);
-		}
-		catch(Exception ex) {
+		} catch (Exception ex) {
 			onFatalErrorAsync(new ResourceManagerException("Unable to request new workers.", ex));
 		}
 	}
@@ -387,16 +388,14 @@ public class MesosResourceManager extends ResourceManager<MesosResourceManagerGa
 	}
 
 	// ------------------------------------------------------------------------
-	//  RPC methods
+	//  Mesos specific methods
 	// ------------------------------------------------------------------------
 
-	@RpcMethod
-	public void registered(Registered message) {
+	protected void registered(Registered message) {
 		connectionMonitor.tell(message, selfActor);
 		try {
 			workerStore.setFrameworkID(Option.apply(message.frameworkId()));
-		}
-		catch(Exception ex) {
+		} catch (Exception ex) {
 			onFatalError(new ResourceManagerException("Unable to store the assigned framework ID.", ex));
 			return;
 		}
@@ -409,8 +408,7 @@ public class MesosResourceManager extends ResourceManager<MesosResourceManagerGa
 	/**
 	 * Called when reconnected to Mesos following a failover event.
 	 */
-	@RpcMethod
-	public void reregistered(ReRegistered message) {
+	protected void reregistered(ReRegistered message) {
 		connectionMonitor.tell(message, selfActor);
 		launchCoordinator.tell(message, selfActor);
 		reconciliationCoordinator.tell(message, selfActor);
@@ -420,8 +418,7 @@ public class MesosResourceManager extends ResourceManager<MesosResourceManagerGa
 	/**
 	 * Called when disconnected from Mesos.
 	 */
-	@RpcMethod
-	public void disconnected(Disconnected message) {
+	protected void disconnected(Disconnected message) {
 		connectionMonitor.tell(message, selfActor);
 		launchCoordinator.tell(message, selfActor);
 		reconciliationCoordinator.tell(message, selfActor);
@@ -431,26 +428,38 @@ public class MesosResourceManager extends ResourceManager<MesosResourceManagerGa
 	/**
 	 * Called when resource offers are made to the framework.
 	 */
-	@RpcMethod
-	public void resourceOffers(ResourceOffers message) {
+	protected void resourceOffers(ResourceOffers message) {
 		launchCoordinator.tell(message, selfActor);
 	}
 
 	/**
 	 * Called when resource offers are rescinded.
 	 */
-	@RpcMethod
-	public void offerRescinded(OfferRescinded message) {
+	protected void offerRescinded(OfferRescinded message) {
 		launchCoordinator.tell(message, selfActor);
 	}
 
 	/**
+	 * Handles a task status update from Mesos.
+	 */
+	protected void statusUpdate(StatusUpdate message) {
+		taskMonitor.tell(message, selfActor);
+		reconciliationCoordinator.tell(message, selfActor);
+		schedulerDriver.acknowledgeStatusUpdate(message.status());
+	}
+
+	protected void frameworkMessage(FrameworkMessage message) {}
+
+	protected void slaveLost(SlaveLost message) {}
+
+	protected void executorLost(ExecutorLost message) {}
+
+	/**
 	 * Accept offers as advised by the launch coordinator.
 	 *
-	 * Acceptance is routed through the RM to update the persistent state before
+	 * <p>Acceptance is routed through the RM to update the persistent state before
 	 * forwarding the message to Mesos.
 	 */
-	@RpcMethod
 	public void acceptOffers(AcceptOffers msg) {
 		try {
 			List<TaskMonitor.TaskGoalStateUpdated> toMonitor = new ArrayList<>(msg.operations().size());
@@ -481,26 +490,14 @@ public class MesosResourceManager extends ResourceManager<MesosResourceManagerGa
 
 			// send the acceptance message to Mesos
 			schedulerDriver.acceptOffers(msg.offerIds(), msg.operations(), msg.filters());
-		}
-		catch(Exception ex) {
+		} catch (Exception ex) {
 			onFatalError(new ResourceManagerException("unable to accept offers", ex));
 		}
 	}
 
 	/**
-	 * Handles a task status update from Mesos.
-	 */
-	@RpcMethod
-	public void statusUpdate(StatusUpdate message) {
-		taskMonitor.tell(message, selfActor);
-		reconciliationCoordinator.tell(message, selfActor);
-		schedulerDriver.acknowledgeStatusUpdate(message.status());
-	}
-
-	/**
 	 * Handles a reconciliation request from a task monitor.
 	 */
-	@RpcMethod
 	public void reconcile(ReconciliationCoordinator.Reconcile message) {
 		// forward to the reconciliation coordinator
 		reconciliationCoordinator.tell(message, selfActor);
@@ -509,7 +506,6 @@ public class MesosResourceManager extends ResourceManager<MesosResourceManagerGa
 	/**
 	 * Handles a termination notification from a task monitor.
 	 */
-	@RpcMethod
 	public void taskTerminated(TaskMonitor.TaskTerminated message) {
 		Protos.TaskID taskID = message.taskID();
 		Protos.TaskStatus status = message.status();
@@ -520,13 +516,12 @@ public class MesosResourceManager extends ResourceManager<MesosResourceManagerGa
 		boolean existed;
 		try {
 			existed = workerStore.removeWorker(taskID);
-		}
-		catch(Exception ex) {
+		} catch (Exception ex) {
 			onFatalError(new ResourceManagerException("unable to remove worker", ex));
 			return;
 		}
 
-		if(!existed) {
+		if (!existed) {
 			LOG.info("Received a termination notice for an unrecognized worker: {}", id);
 			return;
 		}
@@ -550,23 +545,6 @@ public class MesosResourceManager extends ResourceManager<MesosResourceManagerGa
 		closeTaskManagerConnection(id, new Exception(status.getMessage()));
 	}
 
-	@RpcMethod
-	public void frameworkMessage(FrameworkMessage message) {}
-
-	@RpcMethod
-	public void slaveLost(SlaveLost message) {}
-
-	@RpcMethod
-	public void executorLost(ExecutorLost message) {}
-
-	/**
-	 * Called when an error is reported by the scheduler callback.
-	 */
-	@RpcMethod
-	public void error(Error message) {
-		onFatalError(new ResourceManagerException("Connection to Mesos failed", new Exception(message.message())));
-	}
-
 	// ------------------------------------------------------------------------
 	//  Utilities
 	// ------------------------------------------------------------------------
@@ -650,29 +628,139 @@ public class MesosResourceManager extends ResourceManager<MesosResourceManagerGa
 		};
 	}
 
+	private class MesosResourceManagerSchedulerCallback implements Scheduler {
+
+		@Override
+		public void registered(SchedulerDriver driver, final Protos.FrameworkID frameworkId, final Protos.MasterInfo masterInfo) {
+			runAsync(new Runnable() {
+				@Override
+				public void run() {
+					MesosResourceManager.this.registered(new Registered(frameworkId, masterInfo));
+				}
+			});
+		}
+
+		@Override
+		public void reregistered(SchedulerDriver driver, final Protos.MasterInfo masterInfo) {
+			runAsync(new Runnable() {
+				@Override
+				public void run() {
+					MesosResourceManager.this.reregistered(new ReRegistered(masterInfo));
+				}
+			});
+		}
+
+		@Override
+		public void resourceOffers(SchedulerDriver driver, final List<Protos.Offer> offers) {
+			runAsync(new Runnable() {
+				@Override
+				public void run() {
+					MesosResourceManager.this.resourceOffers(new ResourceOffers(offers));
+				}
+			});
+		}
+
+		@Override
+		public void offerRescinded(SchedulerDriver driver, final Protos.OfferID offerId) {
+			runAsync(new Runnable() {
+				@Override
+				public void run() {
+					MesosResourceManager.this.offerRescinded(new OfferRescinded(offerId));
+				}
+			});
+		}
+
+		@Override
+		public void statusUpdate(SchedulerDriver driver, final Protos.TaskStatus status) {
+			runAsync(new Runnable() {
+				@Override
+				public void run() {
+					MesosResourceManager.this.statusUpdate(new StatusUpdate(status));
+				}
+			});
+		}
+
+		@Override
+		public void frameworkMessage(SchedulerDriver driver, final Protos.ExecutorID executorId, final Protos.SlaveID slaveId, final byte[] data) {
+			runAsync(new Runnable() {
+				@Override
+				public void run() {
+					MesosResourceManager.this.frameworkMessage(new FrameworkMessage(executorId, slaveId, data));
+				}
+			});
+		}
+
+		@Override
+		public void disconnected(SchedulerDriver driver) {
+			runAsync(new Runnable() {
+				@Override
+				public void run() {
+					MesosResourceManager.this.disconnected(new Disconnected());
+				}
+			});
+		}
+
+		@Override
+		public void slaveLost(SchedulerDriver driver, final Protos.SlaveID slaveId) {
+			runAsync(new Runnable() {
+				@Override
+				public void run() {
+					MesosResourceManager.this.slaveLost(new SlaveLost(slaveId));
+				}
+			});
+		}
+
+		@Override
+		public void executorLost(SchedulerDriver driver, final Protos.ExecutorID executorId, final Protos.SlaveID slaveId, final int status) {
+			runAsync(new Runnable() {
+				@Override
+				public void run() {
+					MesosResourceManager.this.executorLost(new ExecutorLost(executorId, slaveId, status));
+				}
+			});
+		}
+
+		@Override
+		public void error(SchedulerDriver driver, final String message) {
+			runAsync(new Runnable() {
+				@Override
+				public void run() {
+					onFatalError(new ResourceManagerException(message));
+				}
+			});
+		}
+	}
+
 	/**
 	 * Adapts incoming Akka messages as RPC calls to the resource manager.
 	 */
-	static class AkkaAdapter extends UntypedActor {
-		private final MesosResourceManagerGateway gateway;
-		AkkaAdapter(MesosResourceManagerGateway gateway) {
-			this.gateway = gateway;
-		}
+	private class AkkaAdapter extends UntypedActor {
 		@Override
-		public void onReceive(Object message) throws Exception {
+		public void onReceive(final Object message) throws Exception {
 			if (message instanceof ReconciliationCoordinator.Reconcile) {
-				gateway.reconcile((ReconciliationCoordinator.Reconcile) message);
+				runAsync(new Runnable() {
+					@Override
+					public void run() {
+						reconcile((ReconciliationCoordinator.Reconcile) message);
+					}
+				});
 			} else if (message instanceof TaskMonitor.TaskTerminated) {
-				gateway.taskTerminated((TaskMonitor.TaskTerminated) message);
+				runAsync(new Runnable() {
+					@Override
+					public void run() {
+						taskTerminated((TaskMonitor.TaskTerminated) message);
+					}
+				});
 			} else if (message instanceof AcceptOffers) {
-				gateway.acceptOffers((AcceptOffers) message);
+				runAsync(new Runnable() {
+					@Override
+					public void run() {
+						acceptOffers((AcceptOffers) message);
+					}
+				});
 			} else {
 				MesosResourceManager.LOG.error("unrecognized message: " + message);
 			}
 		}
-
-		public static Props createActorProps(MesosResourceManagerGateway gateway) {
-			return Props.create(AkkaAdapter.class, gateway);
-		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/901a9caf/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerActions.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerActions.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerActions.java
new file mode 100644
index 0000000..e1b0300
--- /dev/null
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerActions.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.runtime.clusterframework;
+
+import org.apache.flink.mesos.scheduler.LaunchCoordinator;
+import org.apache.flink.mesos.scheduler.ReconciliationCoordinator;
+import org.apache.flink.mesos.scheduler.TaskMonitor;
+import org.apache.flink.mesos.scheduler.messages.AcceptOffers;
+
+/**
+ * Actions defined by the MesosResourceManager.
+ *
+ * <p>These are called by the MesosResourceManager components such
+ * as {@link LaunchCoordinator}, and {@link TaskMonitor}.
+ */
+public interface MesosResourceManagerActions {
+
+	/**
+	 * Accept the given offers as advised by the launch coordinator.
+	 *
+	 * <p>Note: This method is a callback for the {@link LaunchCoordinator}.
+	 *
+	 * @param offersToAccept Offers to accept from Mesos
+	 */
+	void acceptOffers(AcceptOffers offersToAccept);
+
+	/**
+	 * Trigger reconciliation with the Mesos master.
+	 *
+	 * <p>Note: This method is a callback for the {@link TaskMonitor}.
+	 *
+	 * @param reconciliationRequest Message containing the tasks which shall be reconciled
+	 */
+	void reconcile(ReconciliationCoordinator.Reconcile reconciliationRequest);
+
+	/**
+	 * Notify that the given Mesos task has been terminated.
+	 *
+	 * <p>Note: This method is a callback for the {@link TaskMonitor}.
+	 *
+	 * @param terminatedTask Message containing the terminated task
+	 */
+	void taskTerminated(TaskMonitor.TaskTerminated terminatedTask);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/901a9caf/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerGateway.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerGateway.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerGateway.java
deleted file mode 100644
index 70ed47d..0000000
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerGateway.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.mesos.runtime.clusterframework;
-
-import org.apache.flink.mesos.scheduler.LaunchCoordinator;
-import org.apache.flink.mesos.scheduler.ReconciliationCoordinator;
-import org.apache.flink.mesos.scheduler.SchedulerGateway;
-import org.apache.flink.mesos.scheduler.TaskMonitor;
-import org.apache.flink.mesos.scheduler.messages.AcceptOffers;
-import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
-
-/**
- * The {@link MesosResourceManager}'s RPC gateway interface.
- */
-public interface MesosResourceManagerGateway extends ResourceManagerGateway, SchedulerGateway {
-
-	/**
-	 * Accept the given offers as advised by the launch coordinator.
-	 *
-	 * Note: This method is a callback for the {@link LaunchCoordinator}.
-	 *
-	 * @param offersToAccept Offers to accept from Mesos
-	 */
-	void acceptOffers(AcceptOffers offersToAccept);
-
-	/**
-	 * Trigger reconciliation with the Mesos master.
-	 *
-	 * Note: This method is a callback for the {@link TaskMonitor}.
-	 *
-	 * @param reconciliationRequest Message containing the tasks which shall be reconciled
-	 */
-	void reconcile(ReconciliationCoordinator.Reconcile reconciliationRequest);
-
-	/**
-	 * Notify that the given Mesos task has been terminated.
-	 *
-	 * Note: This method is a callback for the {@link TaskMonitor}.
-	 *
-	 * @param terminatedTask Message containing the terminated task
-	 */
-	void taskTerminated(TaskMonitor.TaskTerminated terminatedTask);
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/901a9caf/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/MesosWorkerStore.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/MesosWorkerStore.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/MesosWorkerStore.java
index a9ec892..06aeb59 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/MesosWorkerStore.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/MesosWorkerStore.java
@@ -19,6 +19,7 @@
 package org.apache.flink.mesos.runtime.clusterframework.store;
 
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+
 import org.apache.mesos.Protos;
 
 import java.io.Serializable;

http://git-wip-us.apache.org/repos/asf/flink/blob/901a9caf/flink-mesos/src/main/java/org/apache/flink/mesos/scheduler/SchedulerGateway.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/scheduler/SchedulerGateway.java b/flink-mesos/src/main/java/org/apache/flink/mesos/scheduler/SchedulerGateway.java
deleted file mode 100644
index 0dea4e7..0000000
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/scheduler/SchedulerGateway.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.mesos.scheduler;
-
-import org.apache.flink.mesos.scheduler.messages.Disconnected;
-import org.apache.flink.mesos.scheduler.messages.Error;
-import org.apache.flink.mesos.scheduler.messages.ExecutorLost;
-import org.apache.flink.mesos.scheduler.messages.FrameworkMessage;
-import org.apache.flink.mesos.scheduler.messages.OfferRescinded;
-import org.apache.flink.mesos.scheduler.messages.ReRegistered;
-import org.apache.flink.mesos.scheduler.messages.Registered;
-import org.apache.flink.mesos.scheduler.messages.ResourceOffers;
-import org.apache.flink.mesos.scheduler.messages.SlaveLost;
-import org.apache.flink.mesos.scheduler.messages.StatusUpdate;
-import org.apache.flink.runtime.rpc.RpcGateway;
-
-/**
- * A scheduler's RPC gateway interface.
- *
- * Implemented by RPC endpoints that accept Mesos scheduler messages.
- */
-public interface SchedulerGateway extends RpcGateway {
-
-	/**
-	 * Called when connected to Mesos as a new framework.
-	 */
-	void registered(Registered message);
-
-	/**
-	 * Called when reconnected to Mesos following a failover event.
-	 */
-	void reregistered(ReRegistered message);
-
-	/**
-	 * Called when disconnected from Mesos.
-	 */
-	void disconnected(Disconnected message);
-
-	/**
-	 * Called when resource offers are made to the framework.
-	 */
-	void resourceOffers(ResourceOffers message);
-
-	/**
-	 * Called when resource offers are rescinded.
-	 */
-	void offerRescinded(OfferRescinded message);
-
-	/**
-	 * Called when a status update arrives from the Mesos master.
-	 */
-	void statusUpdate(StatusUpdate message);
-
-	/**
-	 * Called when a framework message arrives from a custom Mesos task executor.
-	 */
-	void frameworkMessage(FrameworkMessage message);
-
-	/**
-	 * Called when a Mesos slave is lost.
-	 */
-	void slaveLost(SlaveLost message);
-
-	/**
-	 * Called when a custom Mesos task executor is lost.
-	 */
-	void executorLost(ExecutorLost message);
-
-	/**
-	 * Called when an error is reported by the scheduler callback.
-	 */
-	void error(Error message);
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/901a9caf/flink-mesos/src/main/java/org/apache/flink/mesos/scheduler/SchedulerProxyV2.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/scheduler/SchedulerProxyV2.java b/flink-mesos/src/main/java/org/apache/flink/mesos/scheduler/SchedulerProxyV2.java
deleted file mode 100644
index 21c8346..0000000
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/scheduler/SchedulerProxyV2.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.mesos.scheduler;
-
-import org.apache.flink.mesos.scheduler.messages.Disconnected;
-import org.apache.flink.mesos.scheduler.messages.Error;
-import org.apache.flink.mesos.scheduler.messages.ExecutorLost;
-import org.apache.flink.mesos.scheduler.messages.FrameworkMessage;
-import org.apache.flink.mesos.scheduler.messages.OfferRescinded;
-import org.apache.flink.mesos.scheduler.messages.ReRegistered;
-import org.apache.flink.mesos.scheduler.messages.Registered;
-import org.apache.flink.mesos.scheduler.messages.ResourceOffers;
-import org.apache.flink.mesos.scheduler.messages.SlaveLost;
-import org.apache.flink.mesos.scheduler.messages.StatusUpdate;
-import org.apache.mesos.Protos;
-import org.apache.mesos.Scheduler;
-import org.apache.flink.mesos.runtime.clusterframework.MesosResourceManager;
-import org.apache.mesos.SchedulerDriver;
-
-import java.util.List;
-
-/**
- * This class reacts to callbacks from the Mesos scheduler driver.
- *
- * Forwards incoming messages to the {@link MesosResourceManager} RPC gateway.
- *
- * See https://mesos.apache.org/api/latest/java/org/apache/mesos/Scheduler.html
- */
-public class SchedulerProxyV2 implements Scheduler {
-
-	/** The actor to which we report the callbacks */
-	private final SchedulerGateway gateway;
-
-	public SchedulerProxyV2(SchedulerGateway gateway) {
-		this.gateway = gateway;
-	}
-
-	@Override
-	public void registered(SchedulerDriver driver, Protos.FrameworkID frameworkId, Protos.MasterInfo masterInfo) {
-		gateway.registered(new Registered(frameworkId, masterInfo));
-	}
-
-	@Override
-	public void reregistered(SchedulerDriver driver, Protos.MasterInfo masterInfo) {
-		gateway.reregistered(new ReRegistered(masterInfo));
-	}
-
-	@Override
-	public void disconnected(SchedulerDriver driver) {
-		gateway.disconnected(new Disconnected());
-	}
-
-	@Override
-	public void resourceOffers(SchedulerDriver driver, List<Protos.Offer> offers) {
-		gateway.resourceOffers(new ResourceOffers(offers));
-	}
-
-	@Override
-	public void offerRescinded(SchedulerDriver driver, Protos.OfferID offerId) {
-		gateway.offerRescinded(new OfferRescinded(offerId));
-	}
-
-	@Override
-	public void statusUpdate(SchedulerDriver driver, Protos.TaskStatus status) {
-		gateway.statusUpdate(new StatusUpdate(status));
-	}
-
-	@Override
-	public void frameworkMessage(SchedulerDriver driver, Protos.ExecutorID executorId, Protos.SlaveID slaveId, byte[] data) {
-		gateway.frameworkMessage(new FrameworkMessage(executorId, slaveId, data));
-	}
-
-	@Override
-	public void slaveLost(SchedulerDriver driver, Protos.SlaveID slaveId) {
-		gateway.slaveLost(new SlaveLost(slaveId));
-	}
-
-	@Override
-	public void executorLost(SchedulerDriver driver, Protos.ExecutorID executorId, Protos.SlaveID slaveId, int status) {
-		gateway.executorLost(new ExecutorLost(executorId, slaveId, status));
-	}
-
-	@Override
-	public void error(SchedulerDriver driver, String message) {
-		gateway.error(new Error(message));
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/901a9caf/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java
index 929e927..6e6a59c 100644
--- a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java
+++ b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java
@@ -18,23 +18,20 @@
 
 package org.apache.flink.mesos.runtime.clusterframework;
 
-
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import akka.testkit.JavaTestKit;
-import akka.testkit.TestProbe;
-import com.netflix.fenzo.ConstraintEvaluator;
-import junit.framework.AssertionFailedError;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore;
 import org.apache.flink.mesos.scheduler.ConnectionMonitor;
 import org.apache.flink.mesos.scheduler.LaunchCoordinator;
-import org.apache.flink.mesos.scheduler.ReconciliationCoordinator;
 import org.apache.flink.mesos.scheduler.TaskMonitor;
-import org.apache.flink.mesos.scheduler.messages.*;
-import org.apache.flink.mesos.scheduler.messages.Error;
+import org.apache.flink.mesos.scheduler.messages.AcceptOffers;
+import org.apache.flink.mesos.scheduler.messages.Disconnected;
+import org.apache.flink.mesos.scheduler.messages.OfferRescinded;
+import org.apache.flink.mesos.scheduler.messages.ReRegistered;
+import org.apache.flink.mesos.scheduler.messages.Registered;
+import org.apache.flink.mesos.scheduler.messages.ResourceOffers;
+import org.apache.flink.mesos.scheduler.messages.StatusUpdate;
 import org.apache.flink.mesos.util.MesosArtifactResolver;
 import org.apache.flink.mesos.util.MesosConfiguration;
 import org.apache.flink.runtime.akka.AkkaUtils;
@@ -55,7 +52,9 @@ import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
 import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
 import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.registration.RegistrationResponse;
-import org.apache.flink.runtime.resourcemanager.*;
+import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
+import org.apache.flink.runtime.resourcemanager.SlotRequest;
 import org.apache.flink.runtime.resourcemanager.slotmanager.ResourceManagerActions;
 import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
@@ -66,30 +65,51 @@ import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorRegistrationSuccess;
 import org.apache.flink.runtime.util.TestingFatalErrorHandler;
 import org.apache.flink.util.TestLogger;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.testkit.JavaTestKit;
+import akka.testkit.TestProbe;
+import com.netflix.fenzo.ConstraintEvaluator;
+import junit.framework.AssertionFailedError;
 import org.apache.mesos.Protos;
 import org.apache.mesos.Scheduler;
 import org.apache.mesos.SchedulerDriver;
-import org.junit.*;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import scala.Option;
-import scala.collection.JavaConverters;
 
-import java.util.*;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.UUID;
 import java.util.concurrent.Executor;
 import java.util.concurrent.TimeUnit;
 
+import scala.Option;
+
 import static java.util.Collections.singletonList;
 import static org.apache.flink.mesos.runtime.clusterframework.MesosFlinkResourceManager.extractGoalState;
 import static org.apache.flink.mesos.runtime.clusterframework.MesosFlinkResourceManager.extractResourceID;
-import static org.hamcrest.Matchers.*;
+import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.hasEntry;
+import static org.hamcrest.Matchers.hasItem;
+import static org.hamcrest.Matchers.hasSize;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyBoolean;
-import static org.mockito.Mockito.*;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 /**
  * General tests for the Mesos resource manager component (v2).
@@ -151,13 +171,24 @@ public class MesosResourceManagerTest extends TestLogger {
 		}
 
 		@Override
-		protected ActorRef createConnectionMonitor() { return connectionMonitor.ref(); }
+		protected ActorRef createConnectionMonitor() {
+			return connectionMonitor.ref();
+		}
+
 		@Override
-		protected ActorRef createTaskRouter() { return taskRouter.ref(); }
+		protected ActorRef createTaskMonitor(SchedulerDriver schedulerDriver) {
+			return taskRouter.ref();
+		}
+
 		@Override
-		protected ActorRef createLaunchCoordinator() { return launchCoordinator.ref(); }
+		protected ActorRef createLaunchCoordinator(SchedulerDriver schedulerDriver, ActorRef selfActorRef) {
+			return launchCoordinator.ref();
+		}
+
 		@Override
-		protected ActorRef createReconciliationCoordinator() { return reconciliationCoordinator.ref(); }
+		protected ActorRef createReconciliationCoordinator(SchedulerDriver schedulerDriver) {
+			return reconciliationCoordinator.ref();
+		}
 
 		@Override
 		protected void closeTaskManagerConnection(ResourceID resourceID, Exception cause) {
@@ -179,7 +210,7 @@ public class MesosResourceManagerTest extends TestLogger {
 		// RM
 		ResourceManagerConfiguration rmConfiguration;
 		ResourceID rmResourceID;
-		static final String rmAddress = "/resourceManager";
+		static final String RM_ADDRESS = "/resourceManager";
 		TestingMesosResourceManager resourceManager;
 
 		// domain objects for test purposes
@@ -227,7 +258,7 @@ public class MesosResourceManagerTest extends TestLogger {
 			resourceManager =
 				new TestingMesosResourceManager(
 					rpcService,
-					rmAddress,
+					RM_ADDRESS,
 					rmResourceID,
 					rmConfiguration,
 					rmServices.highAvailabilityServices,
@@ -252,7 +283,7 @@ public class MesosResourceManagerTest extends TestLogger {
 			task3Executor = mockTaskExecutor(task3);
 
 			// JobMaster
-			jobMaster1 = mockJobMaster(rmServices, new JobID(1,0));
+			jobMaster1 = mockJobMaster(rmServices, new JobID(1, 0));
 		}
 
 		/**
@@ -290,7 +321,7 @@ public class MesosResourceManagerTest extends TestLogger {
 						rmActions = invocation.getArgumentAt(2, ResourceManagerActions.class);
 						return null;
 					}
-				}).when(slotManager).start(any(UUID.class),any(Executor.class),any(ResourceManagerActions.class));
+				}).when(slotManager).start(any(UUID.class), any(Executor.class), any(ResourceManagerActions.class));
 
 				when(slotManager.registerSlotRequest(any(SlotRequest.class))).thenReturn(true);
 			}
@@ -669,41 +700,4 @@ public class MesosResourceManagerTest extends TestLogger {
 			resourceManager.taskRouter.expectMsgClass(Disconnected.class);
 		}};
 	}
-
-	/**
-	 * Test Mesos scheduler error.
-	 */
-	@Test
-	public void testError() throws Exception {
-		new Context() {{
-			when(rmServices.workerStore.getFrameworkID()).thenReturn(Option.apply(framework1));
-			startResourceManager();
-			resourceManager.error(new Error("test"));
-			assertTrue(fatalErrorHandler.hasExceptionOccurred());
-		}};
-	}
-
-	@Test
-	public void testAdapter() throws Exception {
-		Protos.TaskID task1 = Protos.TaskID.newBuilder().setValue("taskmanager-00001").build();
-		Protos.TaskStatus status1 = Protos.TaskStatus.newBuilder().setTaskId(task1).setState(Protos.TaskState.TASK_KILLED).build();
-		String host1 = "host1";
-
-		MesosResourceManagerGateway gateway = mock(MesosResourceManagerGateway.class);
-		ActorRef adapter = system.actorOf(MesosResourceManager.AkkaAdapter.createActorProps(gateway));
-
-		List<Protos.TaskStatus> tasks = Collections.singletonList(status1);
-		ReconciliationCoordinator.Reconcile msg1 = new ReconciliationCoordinator.Reconcile(
-			JavaConverters.asScalaBufferConverter(tasks).asScala(), false);
-		adapter.tell(msg1, ActorRef.noSender());
-		verify(gateway, timeout(1000L)).reconcile(eq(msg1));
-
-		TaskMonitor.TaskTerminated msg2 = new TaskMonitor.TaskTerminated(task1, status1);
-		adapter.tell(msg2, ActorRef.noSender());
-		verify(gateway, timeout(1000L)).taskTerminated(eq(msg2));
-
-		AcceptOffers msg3 = new AcceptOffers(host1, Collections.<Protos.OfferID>emptyList(), Collections.<Protos.Offer.Operation>emptyList());
-		adapter.tell(msg3, ActorRef.noSender());
-		verify(gateway, timeout(1000L)).acceptOffers(eq(msg3));
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/901a9caf/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index df452e3..6e7c6af 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -83,8 +83,8 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  *     <li>{@link #requestSlot(UUID, UUID, SlotRequest)} requests a slot from the resource manager</li>
  * </ul>
  */
-public abstract class ResourceManager<C extends ResourceManagerGateway, WorkerType extends Serializable>
-		extends RpcEndpoint<C>
+public abstract class ResourceManager<WorkerType extends Serializable>
+		extends RpcEndpoint<ResourceManagerGateway>
 		implements LeaderContender {
 
 	public static final String RESOURCE_MANAGER_NAME = "resourcemanager";
@@ -609,8 +609,13 @@ public abstract class ResourceManager<C extends ResourceManagerGateway, WorkerTy
 	 */
 	@RpcMethod
 	public void shutDownCluster(final ApplicationStatus finalStatus, final String optionalDiagnostics) {
-		log.info("shut down cluster because application is in {}, diagnostics {}", finalStatus, optionalDiagnostics);
-		shutDownApplication(finalStatus, optionalDiagnostics);
+		log.info("Shut down cluster because application is in {}, diagnostics {}.", finalStatus, optionalDiagnostics);
+
+		try {
+			shutDownApplication(finalStatus, optionalDiagnostics);
+		} catch (ResourceManagerException e) {
+			log.warn("Could not properly shutdown the application.", e);
+		}
 	}
 
 	@RpcMethod
@@ -880,8 +885,9 @@ public abstract class ResourceManager<C extends ResourceManagerGateway, WorkerTy
 	 *
 	 * @param finalStatus The application status to report.
 	 * @param optionalDiagnostics An optional diagnostics message.
+	 * @throws ResourceManagerException if the application could not be shut down.
 	 */
-	protected abstract void shutDownApplication(ApplicationStatus finalStatus, String optionalDiagnostics);
+	protected abstract void shutDownApplication(ApplicationStatus finalStatus, String optionalDiagnostics) throws ResourceManagerException;
 
 	/**
 	 * Allocates a resource using the resource profile.

http://git-wip-us.apache.org/repos/asf/flink/blob/901a9caf/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java
index 12d3a7d..d0c411c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java
@@ -42,7 +42,7 @@ public class ResourceManagerRunner implements FatalErrorHandler {
 
 	private final ResourceManagerRuntimeServices resourceManagerRuntimeServices;
 
-	private final ResourceManager<? extends ResourceManagerGateway, ?> resourceManager;
+	private final ResourceManager<?> resourceManager;
 
 	public ResourceManagerRunner(
 			final ResourceID resourceId,

http://git-wip-us.apache.org/repos/asf/flink/blob/901a9caf/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java
index afeddc4..a921a29 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java
@@ -36,7 +36,7 @@ import org.apache.flink.runtime.rpc.RpcService;
  *
  * This ResourceManager doesn't acquire new resources.
  */
-public class StandaloneResourceManager extends ResourceManager<StandaloneResourceManagerGateway, ResourceID> {
+public class StandaloneResourceManager extends ResourceManager<ResourceID> {
 
 	public StandaloneResourceManager(
 			RpcService rpcService,

http://git-wip-us.apache.org/repos/asf/flink/blob/901a9caf/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManagerGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManagerGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManagerGateway.java
deleted file mode 100644
index 6c8de66..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManagerGateway.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.resourcemanager;
-
-/**
- * The {@link StandaloneResourceManager}'s RPC gateway interface.
- */
-public interface StandaloneResourceManagerGateway extends ResourceManagerGateway {
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/901a9caf/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
index 85ed950..6a0bd87 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
@@ -47,7 +47,6 @@ import org.apache.flink.runtime.resourcemanager.ResourceManager;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
 import org.apache.flink.runtime.resourcemanager.SlotRequest;
 import org.apache.flink.runtime.resourcemanager.StandaloneResourceManager;
-import org.apache.flink.runtime.resourcemanager.StandaloneResourceManagerGateway;
 import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
 import org.apache.flink.runtime.rpc.TestingSerialRpcService;
 import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
@@ -127,7 +126,7 @@ public class TaskExecutorITCase extends TestLogger {
 			TestingUtils.infiniteTime(),
 			TestingUtils.infiniteTime());
 
-		ResourceManager<StandaloneResourceManagerGateway,ResourceID> resourceManager = new StandaloneResourceManager(
+		ResourceManager<ResourceID> resourceManager = new StandaloneResourceManager(
 			rpcService,
 			FlinkResourceManager.RESOURCE_MANAGER_NAME,
 			rmResourceId,

http://git-wip-us.apache.org/repos/asf/flink/blob/901a9caf/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java
index cef3378..2ad9065 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java
@@ -193,7 +193,7 @@ public class YarnFlinkApplicationMasterRunner extends AbstractYarnFlinkApplicati
 		return new AkkaRpcService(actorSystem, Time.of(duration.length(), duration.unit()));
 	}
 
-	private ResourceManager<?,?> createResourceManager(Configuration config) throws Exception {
+	private ResourceManager<?> createResourceManager(Configuration config) throws Exception {
 		final ResourceManagerConfiguration resourceManagerConfiguration = ResourceManagerConfiguration.fromConfiguration(config);
 		final ResourceManagerRuntimeServicesConfiguration resourceManagerRuntimeServicesConfiguration = ResourceManagerRuntimeServicesConfiguration.fromConfiguration(config);
 		final ResourceManagerRuntimeServices resourceManagerRuntimeServices = ResourceManagerRuntimeServices.fromConfiguration(

http://git-wip-us.apache.org/repos/asf/flink/blob/901a9caf/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
index 4ee30f4..6099d18 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
@@ -63,7 +63,7 @@ import scala.concurrent.duration.FiniteDuration;
  * The yarn implementation of the resource manager. Used when the system is started
  * via the resource framework YARN.
  */
-public class YarnResourceManager extends ResourceManager<YarnResourceManagerGateway, ResourceID> implements AMRMClientAsync.CallbackHandler {
+public class YarnResourceManager extends ResourceManager<ResourceID> implements AMRMClientAsync.CallbackHandler {
 
 	/** The process environment variables. */
 	private final Map<String, String> env;

http://git-wip-us.apache.org/repos/asf/flink/blob/901a9caf/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManagerGateway.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManagerGateway.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManagerGateway.java
deleted file mode 100644
index 485fb90..0000000
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManagerGateway.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.yarn;
-
-import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
-
-/**
- * The {@link YarnResourceManager}'s RPC gateway interface.
- */
-public interface YarnResourceManagerGateway extends ResourceManagerGateway {
-}


[2/4] flink git commit: [FLINK-6379] [mesos] Add Mesos ResourceManager (FLIP-6)

Posted by tr...@apache.org.
[FLINK-6379] [mesos] Add Mesos ResourceManager (FLIP-6)

- Make the RPC gateway of the ResourceManager extensible to allow for framework-specific RPC methods
- Introduce FLIP-6 MesosResourceManager w/ tests
- Introduce a Mesos-specific RPC gateway for callbacks from child actors and from the Mesos scheduler client
- Enhance the persistent Mesos worker store to track the resource profile associated with a worker
- Convert RegisteredMesosWorkerNode to Java
- Decline TE registration if framework doesn’t recognize the worker

This closes #3942.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3fe27ac0
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3fe27ac0
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3fe27ac0

Branch: refs/heads/master
Commit: 3fe27ac0796bf7fffce0901954bc2eec3eee02be
Parents: b59148c
Author: Wright, Eron <Er...@emc.com>
Authored: Thu May 18 17:34:16 2017 -0700
Committer: Till Rohrmann <tr...@apache.org>
Committed: Thu Jun 29 08:41:12 2017 +0200

----------------------------------------------------------------------
 .../MesosFlinkResourceManager.java              |   2 +-
 .../clusterframework/MesosResourceManager.java  | 677 +++++++++++++++++
 .../MesosResourceManagerGateway.java            |  37 +
 .../RegisteredMesosWorkerNode.java              |  58 ++
 .../RegisteredMesosWorkerNode.scala             |  33 -
 .../store/MesosWorkerStore.java                 |  34 +-
 .../flink/mesos/scheduler/SchedulerGateway.java |  89 +++
 .../flink/mesos/scheduler/SchedulerProxyV2.java | 103 +++
 .../MesosResourceManagerTest.java               | 736 +++++++++++++++++++
 .../resourcemanager/ResourceManager.java        |  11 +-
 .../resourcemanager/ResourceManagerRunner.java  |   2 +-
 .../StandaloneResourceManager.java              |   2 +-
 .../StandaloneResourceManagerGateway.java       |  25 +
 .../apache/flink/runtime/rpc/RpcEndpoint.java   |   2 +-
 .../taskexecutor/TaskExecutorITCase.java        |   3 +-
 .../yarn/YarnFlinkApplicationMasterRunner.java  |   2 +-
 .../apache/flink/yarn/YarnResourceManager.java  |   2 +-
 .../flink/yarn/YarnResourceManagerGateway.java  |  27 +
 18 files changed, 1799 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/3fe27ac0/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManager.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManager.java
index 6c708fa..d6b5c9d 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManager.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManager.java
@@ -498,7 +498,7 @@ public class MesosFlinkResourceManager extends FlinkResourceManager<RegisteredMe
 	 */
 	@Override
 	protected void releaseStartedWorker(RegisteredMesosWorkerNode worker) {
-		releaseWorker(worker.task());
+		releaseWorker(worker.getWorker());
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/3fe27ac0/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
new file mode 100644
index 0000000..71bfacd
--- /dev/null
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
@@ -0,0 +1,677 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.runtime.clusterframework;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.Props;
+import akka.actor.UntypedActor;
+import com.netflix.fenzo.TaskRequest;
+import com.netflix.fenzo.TaskScheduler;
+import com.netflix.fenzo.VirtualMachineLease;
+import com.netflix.fenzo.functions.Action1;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore;
+import org.apache.flink.mesos.scheduler.ConnectionMonitor;
+import org.apache.flink.mesos.scheduler.LaunchCoordinator;
+import org.apache.flink.mesos.scheduler.LaunchableTask;
+import org.apache.flink.mesos.scheduler.ReconciliationCoordinator;
+import org.apache.flink.mesos.scheduler.SchedulerProxyV2;
+import org.apache.flink.mesos.scheduler.TaskMonitor;
+import org.apache.flink.mesos.scheduler.TaskSchedulerBuilder;
+import org.apache.flink.mesos.scheduler.Tasks;
+import org.apache.flink.mesos.scheduler.messages.AcceptOffers;
+import org.apache.flink.mesos.scheduler.messages.Disconnected;
+import org.apache.flink.mesos.scheduler.messages.Error;
+import org.apache.flink.mesos.scheduler.messages.ExecutorLost;
+import org.apache.flink.mesos.scheduler.messages.FrameworkMessage;
+import org.apache.flink.mesos.scheduler.messages.OfferRescinded;
+import org.apache.flink.mesos.scheduler.messages.ReRegistered;
+import org.apache.flink.mesos.scheduler.messages.Registered;
+import org.apache.flink.mesos.scheduler.messages.ResourceOffers;
+import org.apache.flink.mesos.scheduler.messages.SlaveLost;
+import org.apache.flink.mesos.scheduler.messages.StatusUpdate;
+import org.apache.flink.mesos.util.MesosArtifactResolver;
+import org.apache.flink.mesos.util.MesosConfiguration;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.ContainerSpecification;
+import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.instance.InstanceID;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
+import org.apache.flink.runtime.resourcemanager.ResourceManager;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
+import org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException;
+import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcMethod;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.mesos.Protos;
+import org.apache.mesos.SchedulerDriver;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Option;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * The Mesos implementation of the resource manager.
+ */
+public class MesosResourceManager extends ResourceManager<MesosResourceManagerGateway, RegisteredMesosWorkerNode> {
+	protected static final Logger LOG = LoggerFactory.getLogger(MesosResourceManager.class);
+
+	/** The Flink configuration */
+	private final Configuration flinkConfig;
+
+	/** The Mesos configuration (master and framework info) */
+	private final MesosConfiguration mesosConfig;
+
+	/** The TaskManager container parameters (like container memory size) */
+	private final MesosTaskManagerParameters taskManagerParameters;
+
+	/** Container specification for launching a TM */
+	private final ContainerSpecification taskManagerContainerSpec;
+
+	/** Resolver for HTTP artifacts */
+	private final MesosArtifactResolver artifactResolver;
+
+	/** Persistent storage of allocated containers */
+	private final MesosWorkerStore workerStore;
+
+	/** A local actor system for using the helper actors */
+	private final ActorSystem actorSystem;
+
+	/** Callback handler for the asynchronous Mesos scheduler */
+	private SchedulerProxyV2 schedulerCallbackHandler;
+
+	/** Mesos scheduler driver */
+	private SchedulerDriver schedulerDriver;
+
+	/** an adapter to receive messages from Akka actors */
+	@VisibleForTesting
+	ActorRef selfActor;
+
+	private ActorRef connectionMonitor;
+
+	private ActorRef taskRouter;
+
+	private ActorRef launchCoordinator;
+
+	private ActorRef reconciliationCoordinator;
+
+	/** planning state related to workers - package private for unit test purposes */
+	final Map<ResourceID, MesosWorkerStore.Worker> workersInNew;
+	final Map<ResourceID, MesosWorkerStore.Worker> workersInLaunch;
+	final Map<ResourceID, MesosWorkerStore.Worker> workersBeingReturned;
+
+	public MesosResourceManager(
+			// base class
+			RpcService rpcService,
+			String resourceManagerEndpointId,
+			ResourceID resourceId,
+			ResourceManagerConfiguration resourceManagerConfiguration,
+			HighAvailabilityServices highAvailabilityServices,
+			HeartbeatServices heartbeatServices,
+			SlotManager slotManager,
+			MetricRegistry metricRegistry,
+			JobLeaderIdService jobLeaderIdService,
+			FatalErrorHandler fatalErrorHandler,
+			// Mesos specifics
+			ActorSystem actorSystem,
+			Configuration flinkConfig,
+			MesosConfiguration mesosConfig,
+			MesosWorkerStore workerStore,
+			MesosTaskManagerParameters taskManagerParameters,
+			ContainerSpecification taskManagerContainerSpec,
+			MesosArtifactResolver artifactResolver) {
+		super(
+			rpcService,
+			resourceManagerEndpointId,
+			resourceId,
+			resourceManagerConfiguration,
+			highAvailabilityServices,
+			heartbeatServices,
+			slotManager,
+			metricRegistry,
+			jobLeaderIdService,
+			fatalErrorHandler);
+
+		this.actorSystem = actorSystem;
+
+		this.flinkConfig = requireNonNull(flinkConfig);
+		this.mesosConfig = requireNonNull(mesosConfig);
+
+		this.workerStore = requireNonNull(workerStore);
+		this.artifactResolver = requireNonNull(artifactResolver);
+
+		this.taskManagerParameters = requireNonNull(taskManagerParameters);
+		this.taskManagerContainerSpec = requireNonNull(taskManagerContainerSpec);
+
+		this.workersInNew = new HashMap<>();
+		this.workersInLaunch = new HashMap<>();
+		this.workersBeingReturned = new HashMap<>();
+	}
+
+	protected ActorRef createSelfActor() {
+		return actorSystem.actorOf(
+			AkkaAdapter.createActorProps(getSelf()),"ResourceManager");
+	}
+
+	protected ActorRef createConnectionMonitor() {
+		return actorSystem.actorOf(
+			ConnectionMonitor.createActorProps(ConnectionMonitor.class, flinkConfig),
+			"connectionMonitor");
+	}
+
+	protected ActorRef createTaskRouter() {
+		return actorSystem.actorOf(
+			Tasks.createActorProps(Tasks.class, flinkConfig, schedulerDriver, TaskMonitor.class),
+			"tasks");
+	}
+
+	protected ActorRef createLaunchCoordinator() {
+		return actorSystem.actorOf(
+			LaunchCoordinator.createActorProps(LaunchCoordinator.class, selfActor, flinkConfig, schedulerDriver, createOptimizer()),
+			"launchCoordinator");
+	}
+
+	protected ActorRef createReconciliationCoordinator() {
+		return actorSystem.actorOf(
+			ReconciliationCoordinator.createActorProps(ReconciliationCoordinator.class, flinkConfig, schedulerDriver),
+			"reconciliationCoordinator");
+	}
+
+	// ------------------------------------------------------------------------
+	//  Resource Manager overrides
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Starts the Mesos-specifics.
+	 */
+	@Override
+	protected void initialize() throws ResourceManagerException {
+		// start the worker store
+		try {
+			workerStore.start();
+		}
+		catch(Exception e) {
+			throw new ResourceManagerException("Unable to initialize the worker store.", e);
+		}
+
+		// create the scheduler driver to communicate with Mesos
+		schedulerCallbackHandler = new SchedulerProxyV2(getSelf());
+
+		// register with Mesos
+		// TODO : defer connection until RM acquires leadership
+
+		Protos.FrameworkInfo.Builder frameworkInfo = mesosConfig.frameworkInfo()
+			.clone()
+			.setCheckpoint(true);
+		try {
+			Option<Protos.FrameworkID> frameworkID = workerStore.getFrameworkID();
+			if (frameworkID.isEmpty()) {
+				LOG.info("Registering as new framework.");
+			} else {
+				LOG.info("Recovery scenario: re-registering using framework ID {}.", frameworkID.get().getValue());
+				frameworkInfo.setId(frameworkID.get());
+			}
+		}
+		catch(Exception e) {
+			throw new ResourceManagerException("Unable to recover the framework ID.", e);
+		}
+
+		MesosConfiguration initializedMesosConfig = mesosConfig.withFrameworkInfo(frameworkInfo);
+		MesosConfiguration.logMesosConfig(LOG, initializedMesosConfig);
+		schedulerDriver = initializedMesosConfig.createDriver(schedulerCallbackHandler, false);
+
+		// create supporting actors
+		selfActor = createSelfActor();
+		connectionMonitor = createConnectionMonitor();
+		launchCoordinator = createLaunchCoordinator();
+		reconciliationCoordinator = createReconciliationCoordinator();
+		taskRouter = createTaskRouter();
+
+		// recover state
+		try {
+			recoverWorkers();
+		}
+		catch(Exception e) {
+			throw new ResourceManagerException("Unable to recover Mesos worker state.", e);
+		}
+
+		// begin scheduling
+		connectionMonitor.tell(new ConnectionMonitor.Start(), selfActor);
+		schedulerDriver.start();
+
+		LOG.info("Mesos resource manager initialized.");
+	}
+
+	/**
+	 * Recover framework/worker information persisted by a prior incarnation of the RM.
+	 */
+	private void recoverWorkers() throws Exception {
+		// if this resource manager is recovering from failure,
+		// then some worker tasks are most likely still alive and we can re-obtain them
+		final List<MesosWorkerStore.Worker> tasksFromPreviousAttempts = workerStore.recoverWorkers();
+
+		assert(workersInNew.isEmpty());
+		assert(workersInLaunch.isEmpty());
+		assert(workersBeingReturned.isEmpty());
+
+		if (!tasksFromPreviousAttempts.isEmpty()) {
+			LOG.info("Retrieved {} TaskManagers from previous attempt", tasksFromPreviousAttempts.size());
+
+			List<Tuple2<TaskRequest,String>> toAssign = new ArrayList<>(tasksFromPreviousAttempts.size());
+
+			for (final MesosWorkerStore.Worker worker : tasksFromPreviousAttempts) {
+				LaunchableMesosWorker launchable = createLaunchableMesosWorker(worker.taskID(), worker.profile());
+
+				switch(worker.state()) {
+					case New:
+						// remove new workers because allocation requests are transient
+						workerStore.removeWorker(worker.taskID());
+						break;
+					case Launched:
+						workersInLaunch.put(extractResourceID(worker.taskID()), worker);
+						toAssign.add(new Tuple2<>(launchable.taskRequest(), worker.hostname().get()));
+						break;
+					case Released:
+						workersBeingReturned.put(extractResourceID(worker.taskID()), worker);
+						break;
+				}
+				taskRouter.tell(new TaskMonitor.TaskGoalStateUpdated(extractGoalState(worker)), selfActor);
+			}
+
+			// tell the launch coordinator about prior assignments
+			if(toAssign.size() >= 1) {
+				launchCoordinator.tell(new LaunchCoordinator.Assign(toAssign), selfActor);
+			}
+		}
+	}
+
+	@Override
+	protected void shutDownApplication(ApplicationStatus finalStatus, String optionalDiagnostics) {
+		LOG.info("Shutting down and unregistering as a Mesos framework.");
+		try {
+			// unregister the framework, which implicitly removes all tasks.
+			schedulerDriver.stop(false);
+		}
+		catch(Exception ex) {
+			LOG.warn("unable to unregister the framework", ex);
+		}
+
+		try {
+			workerStore.stop(true);
+		}
+		catch(Exception ex) {
+			LOG.warn("unable to stop the worker state store", ex);
+		}
+
+		LOG.info("Shutdown completed.");
+	}
+
+	@Override
+	public void startNewWorker(ResourceProfile resourceProfile) {
+		LOG.info("Starting a new worker.");
+		try {
+			// generate new workers into persistent state and launch associated actors
+			MesosWorkerStore.Worker worker = MesosWorkerStore.Worker.newWorker(workerStore.newTaskID(), resourceProfile);
+			workerStore.putWorker(worker);
+			workersInNew.put(extractResourceID(worker.taskID()), worker);
+
+			LaunchableMesosWorker launchable = createLaunchableMesosWorker(worker.taskID(), resourceProfile);
+
+			LOG.info("Scheduling Mesos task {} with ({} MB, {} cpus).",
+				launchable.taskID().getValue(), launchable.taskRequest().getMemory(), launchable.taskRequest().getCPUs());
+
+			// tell the task router about the new plans
+			taskRouter.tell(new TaskMonitor.TaskGoalStateUpdated(extractGoalState(worker)), selfActor);
+
+			// tell the launch coordinator to launch the new tasks
+			launchCoordinator.tell(new LaunchCoordinator.Launch(Collections.singletonList((LaunchableTask) launchable)), selfActor);
+		}
+		catch(Exception ex) {
+			onFatalErrorAsync(new ResourceManagerException("unable to request new workers", ex));
+		}
+	}
+
+	@Override
+	public void stopWorker(InstanceID instanceId) {
+		// TODO implement worker release
+	}
+
+	/**
+	 * Callback when a worker was started.
+	 * @param resourceID The worker resource id (as provided by the TaskExecutor)
+	 */
+	@Override
+	protected RegisteredMesosWorkerNode workerStarted(ResourceID resourceID) {
+
+		// note: this may occur more than once for a given worker.
+		MesosWorkerStore.Worker inLaunch = workersInLaunch.get(resourceID);
+		if (inLaunch != null) {
+			return new RegisteredMesosWorkerNode(inLaunch);
+		}
+
+		// the worker is unrecognized or was already released
+		// return null to indicate that TaskExecutor registration should be declined
+		return null;
+	}
+
+	// ------------------------------------------------------------------------
+	//  RPC methods
+	// ------------------------------------------------------------------------
+
+	@RpcMethod
+	public void registered(Registered message) {
+		connectionMonitor.tell(message, selfActor);
+		try {
+			workerStore.setFrameworkID(Option.apply(message.frameworkId()));
+		}
+		catch(Exception ex) {
+			onFatalError(new ResourceManagerException("unable to store the assigned framework ID", ex));
+			return;
+		}
+		launchCoordinator.tell(message, selfActor);
+		reconciliationCoordinator.tell(message, selfActor);
+		taskRouter.tell(message, selfActor);
+	}
+
+	/**
+	 * Called when reconnected to Mesos following a failover event.
+	 */
+	@RpcMethod
+	public void reregistered(ReRegistered message) {
+		connectionMonitor.tell(message, selfActor);
+		launchCoordinator.tell(message, selfActor);
+		reconciliationCoordinator.tell(message, selfActor);
+		taskRouter.tell(message, selfActor);
+	}
+
+	/**
+	 * Called when disconnected from Mesos.
+	 */
+	@RpcMethod
+	public void disconnected(Disconnected message) {
+		connectionMonitor.tell(message, selfActor);
+		launchCoordinator.tell(message, selfActor);
+		reconciliationCoordinator.tell(message, selfActor);
+		taskRouter.tell(message, selfActor);
+	}
+
+	/**
+	 * Called when resource offers are made to the framework.
+	 */
+	@RpcMethod
+	public void resourceOffers(ResourceOffers message) {
+		launchCoordinator.tell(message, selfActor);
+	}
+
+	/**
+	 * Called when resource offers are rescinded.
+	 */
+	@RpcMethod
+	public void offerRescinded(OfferRescinded message) {
+		launchCoordinator.tell(message, selfActor);
+	}
+
+	/**
+	 * Accept offers as advised by the launch coordinator.
+	 *
+	 * Acceptance is routed through the RM to update the persistent state before
+	 * forwarding the message to Mesos.
+	 */
+	@RpcMethod
+	public void acceptOffers(AcceptOffers msg) {
+		try {
+			List<TaskMonitor.TaskGoalStateUpdated> toMonitor = new ArrayList<>(msg.operations().size());
+
+			// transition the persistent state of some tasks to Launched
+			for (Protos.Offer.Operation op : msg.operations()) {
+				if (op.getType() != Protos.Offer.Operation.Type.LAUNCH) {
+					continue;
+				}
+				for (Protos.TaskInfo info : op.getLaunch().getTaskInfosList()) {
+					MesosWorkerStore.Worker worker = workersInNew.remove(extractResourceID(info.getTaskId()));
+					assert (worker != null);
+
+					worker = worker.launchWorker(info.getSlaveId(), msg.hostname());
+					workerStore.putWorker(worker);
+					workersInLaunch.put(extractResourceID(worker.taskID()), worker);
+
+					LOG.info("Launching Mesos task {} on host {}.",
+						worker.taskID().getValue(), worker.hostname().get());
+
+					toMonitor.add(new TaskMonitor.TaskGoalStateUpdated(extractGoalState(worker)));
+				}
+			}
+
+			// tell the task router about the new plans
+			for (TaskMonitor.TaskGoalStateUpdated update : toMonitor) {
+				taskRouter.tell(update, selfActor);
+			}
+
+			// send the acceptance message to Mesos
+			schedulerDriver.acceptOffers(msg.offerIds(), msg.operations(), msg.filters());
+		}
+		catch(Exception ex) {
+			onFatalError(new ResourceManagerException("unable to accept offers", ex));
+		}
+	}
+
+	/**
+	 * Handles a task status update from Mesos.
+	 */
+	@RpcMethod
+	public void statusUpdate(StatusUpdate message) {
+		taskRouter.tell(message, selfActor);
+		reconciliationCoordinator.tell(message, selfActor);
+		schedulerDriver.acknowledgeStatusUpdate(message.status());
+	}
+
+	/**
+	 * Handles a reconciliation request from a task monitor.
+	 */
+	@RpcMethod
+	public void reconcile(ReconciliationCoordinator.Reconcile message) {
+		// forward to the reconciliation coordinator
+		reconciliationCoordinator.tell(message, selfActor);
+	}
+
+	/**
+	 * Handles a termination notification from a task monitor.
+	 */
+	@RpcMethod
+	public void taskTerminated(TaskMonitor.TaskTerminated message) {
+		Protos.TaskID taskID = message.taskID();
+		Protos.TaskStatus status = message.status();
+
+		// note: this callback occurs for failed containers and for released containers alike
+		final ResourceID id = extractResourceID(taskID);
+
+		boolean existed;
+		try {
+			existed = workerStore.removeWorker(taskID);
+		}
+		catch(Exception ex) {
+			onFatalError(new ResourceManagerException("unable to remove worker", ex));
+			return;
+		}
+
+		if(!existed) {
+			LOG.info("Received a termination notice for an unrecognized worker: {}", id);
+			return;
+		}
+
+		// check if this is a failed task or a released task
+		assert(!workersInNew.containsKey(id));
+		if (workersBeingReturned.remove(id) != null) {
+			// regular finished worker that we released
+			LOG.info("Worker {} finished successfully with message: {}",
+				id, status.getMessage());
+		} else {
+			// failed worker, either at startup, or running
+			final MesosWorkerStore.Worker launched = workersInLaunch.remove(id);
+			assert(launched != null);
+			LOG.info("Worker {} failed with status: {}, reason: {}, message: {}. " +
+				"State: {} Reason: {} ({})", id, status.getState(), status.getReason(), status.getMessage());
+
+			// TODO : launch a replacement worker?
+		}
+
+		closeTaskManagerConnection(id, new Exception(status.getMessage()));
+	}
+
+	@RpcMethod
+	public void frameworkMessage(FrameworkMessage message) {}
+
+	@RpcMethod
+	public void slaveLost(SlaveLost message) {}
+
+	@RpcMethod
+	public void executorLost(ExecutorLost message) {}
+
+	/**
+	 * Called when an error is reported by the scheduler callback.
+	 */
+	@RpcMethod
+	public void error(Error message) {
+		onFatalError(new ResourceManagerException("Connection to Mesos failed", new Exception(message.message())));
+	}
+
+	// ------------------------------------------------------------------------
+	//  Utilities
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Creates a launchable task for Fenzo to process.
+	 */
+	private LaunchableMesosWorker createLaunchableMesosWorker(Protos.TaskID taskID, ResourceProfile resourceProfile) {
+
+		// create the specific TM parameters from the resource profile and some defaults
+		MesosTaskManagerParameters params = new MesosTaskManagerParameters(
+			resourceProfile.getCpuCores() < 1 ? taskManagerParameters.cpus() : resourceProfile.getCpuCores(),
+			taskManagerParameters.containerType(),
+			taskManagerParameters.containerImageName(),
+			new ContaineredTaskManagerParameters(
+				resourceProfile.getMemoryInMB() < 0 ? taskManagerParameters.containeredParameters().taskManagerTotalMemoryMB() : resourceProfile.getMemoryInMB(),
+				resourceProfile.getHeapMemoryInMB(),
+				resourceProfile.getDirectMemoryInMB(),
+				1,
+				new HashMap<>(taskManagerParameters.containeredParameters().taskManagerEnv())),
+			taskManagerParameters.containerVolumes(),
+			taskManagerParameters.constraints(),
+			taskManagerParameters.bootstrapCommand(),
+			taskManagerParameters.getTaskManagerHostname()
+		);
+
+		LaunchableMesosWorker launchable =
+			new LaunchableMesosWorker(
+				artifactResolver,
+				params,
+				taskManagerContainerSpec,
+				taskID,
+				mesosConfig);
+
+		return launchable;
+	}
+
+	/**
+	 * Extracts a unique ResourceID from the Mesos task.
+	 *
+	 * @param taskId the Mesos TaskID
+	 * @return The ResourceID for the container
+	 */
+	static ResourceID extractResourceID(Protos.TaskID taskId) {
+		return new ResourceID(taskId.getValue());
+	}
+
+	/**
+	 * Extracts the Mesos task goal state from the worker information.
+	 * @param worker the persistent worker information.
+	 * @return goal state information for the {@Link TaskMonitor}.
+	 */
+	static TaskMonitor.TaskGoalState extractGoalState(MesosWorkerStore.Worker worker) {
+		switch(worker.state()) {
+			case New: return new TaskMonitor.New(worker.taskID());
+			case Launched: return new TaskMonitor.Launched(worker.taskID(), worker.slaveID().get());
+			case Released: return new TaskMonitor.Released(worker.taskID(), worker.slaveID().get());
+			default: throw new IllegalArgumentException("unsupported worker state");
+		}
+	}
+
+	/**
+	 * Creates the Fenzo optimizer (builder).
+	 * The builder is an indirection to facilitate unit testing of the Launch Coordinator.
+	 */
+	private static TaskSchedulerBuilder createOptimizer() {
+		return new TaskSchedulerBuilder() {
+			TaskScheduler.Builder builder = new TaskScheduler.Builder();
+
+			@Override
+			public TaskSchedulerBuilder withLeaseRejectAction(Action1<VirtualMachineLease> action) {
+				builder.withLeaseRejectAction(action);
+				return this;
+			}
+
+			@Override
+			public TaskScheduler build() {
+				return builder.build();
+			}
+		};
+	}
+
+	/**
+	 * Adapts incoming Akka messages as RPC calls to the resource manager.
+	 */
+	static class AkkaAdapter extends UntypedActor {
+		private final MesosResourceManagerGateway gateway;
+		AkkaAdapter(MesosResourceManagerGateway gateway) {
+			this.gateway = gateway;
+		}
+		@Override
+		public void onReceive(Object message) throws Exception {
+			if (message instanceof ReconciliationCoordinator.Reconcile) {
+				gateway.reconcile((ReconciliationCoordinator.Reconcile) message);
+			} else if (message instanceof TaskMonitor.TaskTerminated) {
+				gateway.taskTerminated((TaskMonitor.TaskTerminated) message);
+			} else if (message instanceof AcceptOffers) {
+				gateway.acceptOffers((AcceptOffers) message);
+			} else {
+				MesosResourceManager.LOG.error("unrecognized message: " + message);
+			}
+		}
+
+		public static Props createActorProps(MesosResourceManagerGateway gateway) {
+			return Props.create(AkkaAdapter.class, gateway);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/3fe27ac0/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerGateway.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerGateway.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerGateway.java
new file mode 100644
index 0000000..e353dcc
--- /dev/null
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerGateway.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.runtime.clusterframework;
+
+import org.apache.flink.mesos.scheduler.ReconciliationCoordinator;
+import org.apache.flink.mesos.scheduler.SchedulerGateway;
+import org.apache.flink.mesos.scheduler.TaskMonitor;
+import org.apache.flink.mesos.scheduler.messages.AcceptOffers;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
+
+/**
+ * The {@link MesosResourceManager}'s RPC gateway interface.
+ */
+public interface MesosResourceManagerGateway extends ResourceManagerGateway, SchedulerGateway {
+
+	void acceptOffers(AcceptOffers msg);
+
+	void reconcile(ReconciliationCoordinator.Reconcile message);
+
+	void taskTerminated(TaskMonitor.TaskTerminated message);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/3fe27ac0/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/RegisteredMesosWorkerNode.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/RegisteredMesosWorkerNode.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/RegisteredMesosWorkerNode.java
new file mode 100644
index 0000000..c65c482
--- /dev/null
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/RegisteredMesosWorkerNode.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.runtime.clusterframework;
+
+import org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceIDRetrievable;
+import org.apache.flink.util.Preconditions;
+
+import java.io.Serializable;
+
+/**
+ * A representation of a registered Mesos task managed by the {@link MesosFlinkResourceManager}.
+ */
+public class RegisteredMesosWorkerNode implements Serializable, ResourceIDRetrievable {
+
+	private static final long serialVersionUID = 2;
+
+	private final MesosWorkerStore.Worker worker;
+
+	public RegisteredMesosWorkerNode(MesosWorkerStore.Worker worker) {
+		this.worker = Preconditions.checkNotNull(worker);
+		Preconditions.checkArgument(worker.slaveID().isDefined());
+		Preconditions.checkArgument(worker.hostname().isDefined());
+	}
+
+	public MesosWorkerStore.Worker getWorker() {
+		return worker;
+	}
+
+	@Override
+	public ResourceID getResourceID() {
+		return MesosResourceManager.extractResourceID(worker.taskID());
+	}
+
+	@Override
+	public String toString() {
+		return "RegisteredMesosWorkerNode{" +
+			"worker=" + worker +
+			'}';
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/3fe27ac0/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/RegisteredMesosWorkerNode.scala
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/RegisteredMesosWorkerNode.scala b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/RegisteredMesosWorkerNode.scala
deleted file mode 100644
index 7ca388f..0000000
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/RegisteredMesosWorkerNode.scala
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.mesos.runtime.clusterframework
-
-import org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore
-import org.apache.flink.runtime.clusterframework.types.{ResourceID, ResourceIDRetrievable}
-
-/**
-  * A representation of a registered Mesos task managed by the [[MesosFlinkResourceManager]].
-  */
-case class RegisteredMesosWorkerNode(task: MesosWorkerStore.Worker) extends ResourceIDRetrievable {
-
-  require(task.slaveID().isDefined)
-  require(task.hostname().isDefined)
-
-  override val getResourceID: ResourceID = MesosFlinkResourceManager.extractResourceID(task.taskID())
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/3fe27ac0/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/MesosWorkerStore.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/MesosWorkerStore.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/MesosWorkerStore.java
index f1f54ce..e76ff63 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/MesosWorkerStore.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/MesosWorkerStore.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.mesos.runtime.clusterframework.store;
 
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.mesos.Protos;
 
 import java.io.Serializable;
@@ -93,19 +94,24 @@ public interface MesosWorkerStore {
 
 		private final Protos.TaskID taskID;
 
+		private final ResourceProfile profile;
+
 		private final Option<Protos.SlaveID> slaveID;
 
 		private final Option<String> hostname;
 
 		private final WorkerState state;
 
-		private Worker(Protos.TaskID taskID, Option<Protos.SlaveID> slaveID, Option<String> hostname, WorkerState state) {
+		private Worker(Protos.TaskID taskID, ResourceProfile profile,
+				Option<Protos.SlaveID> slaveID, Option<String> hostname, WorkerState state) {
 			requireNonNull(taskID, "taskID");
+			requireNonNull(profile, "profile");
 			requireNonNull(slaveID, "slaveID");
 			requireNonNull(hostname, "hostname");
 			requireNonNull(state, "state");
 
 			this.taskID = taskID;
+			this.profile = profile;
 			this.slaveID = slaveID;
 			this.hostname = hostname;
 			this.state = state;
@@ -119,6 +125,14 @@ public interface MesosWorkerStore {
 		}
 
 		/**
+		 * Get the resource profile associated with the worker.
+		 * @return
+		 */
+		public ResourceProfile profile() {
+			return profile;
+		}
+
+		/**
 		 * Get the worker's assigned slave ID.
 		 */
 		public Option<Protos.SlaveID> slaveID() {
@@ -148,6 +162,19 @@ public interface MesosWorkerStore {
 		public static Worker newWorker(Protos.TaskID taskID) {
 			return new Worker(
 				taskID,
+				ResourceProfile.UNKNOWN,
+				Option.<Protos.SlaveID>empty(), Option.<String>empty(),
+				WorkerState.New);
+		}
+
+		/**
+		 * Create a new worker with the given taskID.
+		 * @return a new worker instance.
+		 */
+		public static Worker newWorker(Protos.TaskID taskID, ResourceProfile profile) {
+			return new Worker(
+				taskID,
+				profile,
 				Option.<Protos.SlaveID>empty(), Option.<String>empty(),
 				WorkerState.New);
 		}
@@ -157,7 +184,7 @@ public interface MesosWorkerStore {
 		 * @return a new worker instance (does not mutate the current instance).
 		 */
 		public Worker launchWorker(Protos.SlaveID slaveID, String hostname) {
-			return new Worker(taskID, Option.apply(slaveID), Option.apply(hostname), WorkerState.Launched);
+			return new Worker(taskID, profile, Option.apply(slaveID), Option.apply(hostname), WorkerState.Launched);
 		}
 
 		/**
@@ -165,7 +192,7 @@ public interface MesosWorkerStore {
 		 * @return a new worker instance (does not mutate the current instance).
 		 */
 		public Worker releaseWorker() {
-			return new Worker(taskID, slaveID, hostname, WorkerState.Released);
+			return new Worker(taskID, profile, slaveID, hostname, WorkerState.Released);
 		}
 
 		@Override
@@ -195,6 +222,7 @@ public interface MesosWorkerStore {
 				", slaveID=" + slaveID +
 				", hostname=" + hostname +
 				", state=" + state +
+				", profile=" + profile +
 				'}';
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/3fe27ac0/flink-mesos/src/main/java/org/apache/flink/mesos/scheduler/SchedulerGateway.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/scheduler/SchedulerGateway.java b/flink-mesos/src/main/java/org/apache/flink/mesos/scheduler/SchedulerGateway.java
new file mode 100644
index 0000000..0dea4e7
--- /dev/null
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/scheduler/SchedulerGateway.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.scheduler;
+
+import org.apache.flink.mesos.scheduler.messages.Disconnected;
+import org.apache.flink.mesos.scheduler.messages.Error;
+import org.apache.flink.mesos.scheduler.messages.ExecutorLost;
+import org.apache.flink.mesos.scheduler.messages.FrameworkMessage;
+import org.apache.flink.mesos.scheduler.messages.OfferRescinded;
+import org.apache.flink.mesos.scheduler.messages.ReRegistered;
+import org.apache.flink.mesos.scheduler.messages.Registered;
+import org.apache.flink.mesos.scheduler.messages.ResourceOffers;
+import org.apache.flink.mesos.scheduler.messages.SlaveLost;
+import org.apache.flink.mesos.scheduler.messages.StatusUpdate;
+import org.apache.flink.runtime.rpc.RpcGateway;
+
+/**
+ * A scheduler's RPC gateway interface.
+ *
+ * Implemented by RPC endpoints that accept Mesos scheduler messages.
+ */
+public interface SchedulerGateway extends RpcGateway {
+
+	/**
+	 * Called when connected to Mesos as a new framework.
+	 */
+	void registered(Registered message);
+
+	/**
+	 * Called when reconnected to Mesos following a failover event.
+	 */
+	void reregistered(ReRegistered message);
+
+	/**
+	 * Called when disconnected from Mesos.
+	 */
+	void disconnected(Disconnected message);
+
+	/**
+	 * Called when resource offers are made to the framework.
+	 */
+	void resourceOffers(ResourceOffers message);
+
+	/**
+	 * Called when resource offers are rescinded.
+	 */
+	void offerRescinded(OfferRescinded message);
+
+	/**
+	 * Called when a status update arrives from the Mesos master.
+	 */
+	void statusUpdate(StatusUpdate message);
+
+	/**
+	 * Called when a framework message arrives from a custom Mesos task executor.
+	 */
+	void frameworkMessage(FrameworkMessage message);
+
+	/**
+	 * Called when a Mesos slave is lost.
+	 */
+	void slaveLost(SlaveLost message);
+
+	/**
+	 * Called when a custom Mesos task executor is lost.
+	 */
+	void executorLost(ExecutorLost message);
+
+	/**
+	 * Called when an error is reported by the scheduler callback.
+	 */
+	void error(Error message);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/3fe27ac0/flink-mesos/src/main/java/org/apache/flink/mesos/scheduler/SchedulerProxyV2.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/scheduler/SchedulerProxyV2.java b/flink-mesos/src/main/java/org/apache/flink/mesos/scheduler/SchedulerProxyV2.java
new file mode 100644
index 0000000..21c8346
--- /dev/null
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/scheduler/SchedulerProxyV2.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.scheduler;
+
+import org.apache.flink.mesos.scheduler.messages.Disconnected;
+import org.apache.flink.mesos.scheduler.messages.Error;
+import org.apache.flink.mesos.scheduler.messages.ExecutorLost;
+import org.apache.flink.mesos.scheduler.messages.FrameworkMessage;
+import org.apache.flink.mesos.scheduler.messages.OfferRescinded;
+import org.apache.flink.mesos.scheduler.messages.ReRegistered;
+import org.apache.flink.mesos.scheduler.messages.Registered;
+import org.apache.flink.mesos.scheduler.messages.ResourceOffers;
+import org.apache.flink.mesos.scheduler.messages.SlaveLost;
+import org.apache.flink.mesos.scheduler.messages.StatusUpdate;
+import org.apache.mesos.Protos;
+import org.apache.mesos.Scheduler;
+import org.apache.flink.mesos.runtime.clusterframework.MesosResourceManager;
+import org.apache.mesos.SchedulerDriver;
+
+import java.util.List;
+
+/**
+ * This class reacts to callbacks from the Mesos scheduler driver.
+ *
+ * Forwards incoming messages to the {@link MesosResourceManager} RPC gateway.
+ *
+ * See https://mesos.apache.org/api/latest/java/org/apache/mesos/Scheduler.html
+ */
+public class SchedulerProxyV2 implements Scheduler {
+
+	/** The actor to which we report the callbacks */
+	private final SchedulerGateway gateway;
+
+	public SchedulerProxyV2(SchedulerGateway gateway) {
+		this.gateway = gateway;
+	}
+
+	@Override
+	public void registered(SchedulerDriver driver, Protos.FrameworkID frameworkId, Protos.MasterInfo masterInfo) {
+		gateway.registered(new Registered(frameworkId, masterInfo));
+	}
+
+	@Override
+	public void reregistered(SchedulerDriver driver, Protos.MasterInfo masterInfo) {
+		gateway.reregistered(new ReRegistered(masterInfo));
+	}
+
+	@Override
+	public void disconnected(SchedulerDriver driver) {
+		gateway.disconnected(new Disconnected());
+	}
+
+	@Override
+	public void resourceOffers(SchedulerDriver driver, List<Protos.Offer> offers) {
+		gateway.resourceOffers(new ResourceOffers(offers));
+	}
+
+	@Override
+	public void offerRescinded(SchedulerDriver driver, Protos.OfferID offerId) {
+		gateway.offerRescinded(new OfferRescinded(offerId));
+	}
+
+	@Override
+	public void statusUpdate(SchedulerDriver driver, Protos.TaskStatus status) {
+		gateway.statusUpdate(new StatusUpdate(status));
+	}
+
+	@Override
+	public void frameworkMessage(SchedulerDriver driver, Protos.ExecutorID executorId, Protos.SlaveID slaveId, byte[] data) {
+		gateway.frameworkMessage(new FrameworkMessage(executorId, slaveId, data));
+	}
+
+	@Override
+	public void slaveLost(SchedulerDriver driver, Protos.SlaveID slaveId) {
+		gateway.slaveLost(new SlaveLost(slaveId));
+	}
+
+	@Override
+	public void executorLost(SchedulerDriver driver, Protos.ExecutorID executorId, Protos.SlaveID slaveId, int status) {
+		gateway.executorLost(new ExecutorLost(executorId, slaveId, status));
+	}
+
+	@Override
+	public void error(SchedulerDriver driver, String message) {
+		gateway.error(new Error(message));
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/3fe27ac0/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java
new file mode 100644
index 0000000..0c715ba
--- /dev/null
+++ b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java
@@ -0,0 +1,736 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.runtime.clusterframework;
+
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.testkit.JavaTestKit;
+import akka.testkit.TestProbe;
+import com.netflix.fenzo.ConstraintEvaluator;
+import junit.framework.AssertionFailedError;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore;
+import org.apache.flink.mesos.scheduler.ConnectionMonitor;
+import org.apache.flink.mesos.scheduler.LaunchCoordinator;
+import org.apache.flink.mesos.scheduler.ReconciliationCoordinator;
+import org.apache.flink.mesos.scheduler.TaskMonitor;
+import org.apache.flink.mesos.scheduler.messages.*;
+import org.apache.flink.mesos.scheduler.messages.Error;
+import org.apache.flink.mesos.util.MesosArtifactResolver;
+import org.apache.flink.mesos.util.MesosConfiguration;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.ContainerSpecification;
+import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.concurrent.ScheduledExecutor;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import org.apache.flink.runtime.heartbeat.TestingHeartbeatServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
+import org.apache.flink.runtime.jobmaster.JobMasterGateway;
+import org.apache.flink.runtime.jobmaster.JobMasterRegistrationSuccess;
+import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
+import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.registration.RegistrationResponse;
+import org.apache.flink.runtime.resourcemanager.*;
+import org.apache.flink.runtime.resourcemanager.slotmanager.ResourceManagerActions;
+import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.TestingSerialRpcService;
+import org.apache.flink.runtime.taskexecutor.SlotReport;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorRegistrationSuccess;
+import org.apache.flink.runtime.util.TestingFatalErrorHandler;
+import org.apache.flink.util.TestLogger;
+import org.apache.mesos.Protos;
+import org.apache.mesos.Scheduler;
+import org.apache.mesos.SchedulerDriver;
+import org.junit.*;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Option;
+
+import java.util.*;
+import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
+
+import static java.util.Collections.singletonList;
+import static org.apache.flink.mesos.runtime.clusterframework.MesosFlinkResourceManager.extractGoalState;
+import static org.apache.flink.mesos.runtime.clusterframework.MesosFlinkResourceManager.extractResourceID;
+import static org.hamcrest.Matchers.*;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyBoolean;
+import static org.mockito.Mockito.*;
+
+/**
+ * General tests for the Mesos resource manager component (v2).
+ */
+public class MesosResourceManagerTest extends TestLogger {
+
+	private static final Logger LOG = LoggerFactory.getLogger(MesosResourceManagerTest.class);
+
+	private static Configuration flinkConfig = new Configuration();
+
+	private static ActorSystem system;
+
+	@Before
+	public void setup() {
+		system = AkkaUtils.createLocalActorSystem(flinkConfig);
+	}
+
+	@After
+	public void teardown() {
+		JavaTestKit.shutdownActorSystem(system);
+	}
+
+	/**
+	 * The RM with some test-specific behavior.
+	 */
+	static class TestingMesosResourceManager extends MesosResourceManager {
+
+		public TestProbe connectionMonitor = new TestProbe(system);
+		public TestProbe taskRouter = new TestProbe(system);
+		public TestProbe launchCoordinator = new TestProbe(system);
+		public TestProbe reconciliationCoordinator = new TestProbe(system);
+
+		public final Set<ResourceID> closedTaskManagerConnections = new HashSet<>();
+
+		public TestingMesosResourceManager(
+			RpcService rpcService,
+			String resourceManagerEndpointId,
+			ResourceID resourceId,
+			ResourceManagerConfiguration resourceManagerConfiguration,
+			HighAvailabilityServices highAvailabilityServices,
+			HeartbeatServices heartbeatServices,
+			SlotManager slotManager,
+			MetricRegistry metricRegistry,
+			JobLeaderIdService jobLeaderIdService,
+			FatalErrorHandler fatalErrorHandler,
+			// Mesos specifics
+			ActorSystem actorSystem,
+			Configuration flinkConfig,
+			MesosConfiguration mesosConfig,
+			MesosWorkerStore workerStore,
+			MesosTaskManagerParameters taskManagerParameters,
+			ContainerSpecification taskManagerContainerSpec,
+			MesosArtifactResolver artifactResolver) {
+			super(rpcService, resourceManagerEndpointId, resourceId, resourceManagerConfiguration,
+				highAvailabilityServices, heartbeatServices, slotManager, metricRegistry,
+				jobLeaderIdService, fatalErrorHandler, actorSystem, flinkConfig, mesosConfig, workerStore,
+				taskManagerParameters, taskManagerContainerSpec, artifactResolver);
+		}
+
+		@Override
+		protected ActorRef createConnectionMonitor() { return connectionMonitor.ref(); }
+		@Override
+		protected ActorRef createTaskRouter() { return taskRouter.ref(); }
+		@Override
+		protected ActorRef createLaunchCoordinator() { return launchCoordinator.ref(); }
+		@Override
+		protected ActorRef createReconciliationCoordinator() { return reconciliationCoordinator.ref(); }
+
+		@Override
+		protected void closeTaskManagerConnection(ResourceID resourceID, Exception cause) {
+			super.closeTaskManagerConnection(resourceID, cause);
+			closedTaskManagerConnections.add(resourceID);
+		}
+	}
+
+	/**
+	 * The context fixture.
+	 */
+	static class Context implements AutoCloseable {
+
+		// services
+		TestingSerialRpcService rpcService;
+		TestingFatalErrorHandler fatalErrorHandler;
+		MockMesosResourceManagerRuntimeServices rmServices;
+
+		// RM
+		ResourceManagerConfiguration rmConfiguration;
+		ResourceID rmResourceID;
+		static final String rmAddress = "/resourceManager";
+		TestingMesosResourceManager resourceManager;
+
+		// domain objects for test purposes
+		final ResourceProfile resourceProfile1 = new ResourceProfile(1.0, 1);
+		final ResourceProfile resourceProfile2 = new ResourceProfile(2.0, 2);
+
+		Protos.FrameworkID framework1 = Protos.FrameworkID.newBuilder().setValue("framework1").build();
+		public Protos.SlaveID slave1 = Protos.SlaveID.newBuilder().setValue("slave1").build();
+		public String slave1host = "localhost";
+		public Protos.OfferID offer1 = Protos.OfferID.newBuilder().setValue("offer1").build();
+		public Protos.TaskID task1 = Protos.TaskID.newBuilder().setValue("taskmanager-00001").build();
+		public Protos.TaskID task2 = Protos.TaskID.newBuilder().setValue("taskmanager-00002").build();
+		public Protos.TaskID task3 = Protos.TaskID.newBuilder().setValue("taskmanager-00003").build();
+
+		// task executors
+		SlotReport slotReport = new SlotReport();
+		public MockTaskExecutor task1Executor;
+		public MockTaskExecutor task2Executor;
+		public MockTaskExecutor task3Executor;
+
+		// job masters
+		public MockJobMaster jobMaster1;
+
+		/**
+		 * Create mock RM dependencies.
+		 */
+		public Context() {
+			try {
+				rpcService = new TestingSerialRpcService();
+				fatalErrorHandler = new TestingFatalErrorHandler();
+				rmServices = new MockMesosResourceManagerRuntimeServices();
+
+				// TaskExecutor templating
+				ContainerSpecification containerSpecification = new ContainerSpecification();
+				ContaineredTaskManagerParameters containeredParams =
+					new ContaineredTaskManagerParameters(1024, 768, 256, 4, new HashMap<String, String>());
+				MesosTaskManagerParameters tmParams = new MesosTaskManagerParameters(
+					1.0, MesosTaskManagerParameters.ContainerType.MESOS, Option.<String>empty(), containeredParams,
+					Collections.<Protos.Volume>emptyList(), Collections.<ConstraintEvaluator>emptyList(), Option.<String>empty(),
+					Option.<String>empty());
+
+				// resource manager
+				rmConfiguration = new ResourceManagerConfiguration(
+					Time.seconds(5L),
+					Time.seconds(5L));
+				rmResourceID = ResourceID.generate();
+				resourceManager =
+					new TestingMesosResourceManager(
+						rpcService,
+						rmAddress,
+						rmResourceID,
+						rmConfiguration,
+						rmServices.highAvailabilityServices,
+						rmServices.heartbeatServices,
+						rmServices.slotManager,
+						rmServices.metricRegistry,
+						rmServices.jobLeaderIdService,
+						fatalErrorHandler,
+						// Mesos specifics
+						system,
+						flinkConfig,
+						rmServices.mesosConfig,
+						rmServices.workerStore,
+						tmParams,
+						containerSpecification,
+						rmServices.artifactResolver
+					);
+
+				// TaskExecutors
+				task1Executor = mockTaskExecutor(task1);
+				task2Executor = mockTaskExecutor(task2);
+				task3Executor = mockTaskExecutor(task3);
+
+				// JobMaster
+				jobMaster1 = mockJobMaster(rmServices, new JobID(1,0));
+
+			} catch (Exception ex) {
+				throw new RuntimeException(ex);
+			}
+		}
+
+		/**
+		 * Mock services needed by the resource manager.
+		 */
+		class MockResourceManagerRuntimeServices {
+
+			public final ScheduledExecutor scheduledExecutor;
+			public final TestingHighAvailabilityServices highAvailabilityServices;
+			public final HeartbeatServices heartbeatServices;
+			public final MetricRegistry metricRegistry;
+			public final TestingLeaderElectionService rmLeaderElectionService;
+			public final JobLeaderIdService jobLeaderIdService;
+			public final SlotManager slotManager;
+			public ResourceManagerActions rmActions;
+
+			public UUID rmLeaderSessionId;
+
+			public MockResourceManagerRuntimeServices() throws Exception {
+				scheduledExecutor = mock(ScheduledExecutor.class);
+				highAvailabilityServices = new TestingHighAvailabilityServices();
+				rmLeaderElectionService = new TestingLeaderElectionService();
+				highAvailabilityServices.setResourceManagerLeaderElectionService(rmLeaderElectionService);
+				heartbeatServices = new TestingHeartbeatServices(5L, 5L, scheduledExecutor);
+				metricRegistry = mock(MetricRegistry.class);
+				slotManager = mock(SlotManager.class);
+				jobLeaderIdService = new JobLeaderIdService(
+					highAvailabilityServices,
+					rpcService.getScheduledExecutor(),
+					Time.minutes(5L));
+
+				doAnswer(new Answer<Object>() {
+					@Override
+					public Object answer(InvocationOnMock invocation) throws Throwable {
+						rmActions = invocation.getArgumentAt(2, ResourceManagerActions.class);
+						return null;
+					}
+				}).when(slotManager).start(any(UUID.class),any(Executor.class),any(ResourceManagerActions.class));
+
+				when(slotManager.registerSlotRequest(any(SlotRequest.class))).thenReturn(true);
+
+//				when(slotManager.registerSlotRequest(any(SlotRequest.class))).thenAnswer(new Answer<Object>() {
+//					@Override
+//					public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
+//						SlotRequest request = invocationOnMock.getArgumentAt(0, SlotRequest.class);
+//						return new RMSlotRequestRegistered(request.getAllocationId());
+//					}
+//				});
+			}
+
+			public void grantLeadership() {
+				rmLeaderSessionId = UUID.randomUUID();
+				rmLeaderElectionService.isLeader(rmLeaderSessionId);
+			}
+		}
+
+		class MockMesosResourceManagerRuntimeServices extends MockResourceManagerRuntimeServices {
+			public SchedulerDriver schedulerDriver;
+			public MesosConfiguration mesosConfig;
+			public MesosWorkerStore workerStore;
+			public MesosArtifactResolver artifactResolver;
+
+			public MockMesosResourceManagerRuntimeServices() throws Exception {
+				super();
+				schedulerDriver = mock(SchedulerDriver.class);
+
+				mesosConfig = mock(MesosConfiguration.class);
+				when(mesosConfig.frameworkInfo()).thenReturn(Protos.FrameworkInfo.newBuilder());
+				when(mesosConfig.withFrameworkInfo(any(Protos.FrameworkInfo.Builder.class))).thenReturn(mesosConfig);
+				when(mesosConfig.createDriver(any(Scheduler.class), anyBoolean())).thenReturn(schedulerDriver);
+
+				workerStore = mock(MesosWorkerStore.class);
+				when(workerStore.getFrameworkID()).thenReturn(Option.<Protos.FrameworkID>empty());
+
+				artifactResolver = mock(MesosArtifactResolver.class);
+			}
+		}
+
+		class MockJobMaster {
+			public final JobID jobID;
+			public final ResourceID resourceID;
+			public final String address;
+			public final JobMasterGateway gateway;
+			public final UUID leaderSessionID;
+			public final TestingLeaderRetrievalService leaderRetrievalService;
+
+			public MockJobMaster(JobID jobID) {
+				this.jobID = jobID;
+				this.resourceID = new ResourceID(jobID.toString());
+				this.address = "/" + jobID;
+				this.gateway = mock(JobMasterGateway.class);
+				this.leaderSessionID = UUID.randomUUID();
+				this.leaderRetrievalService = new TestingLeaderRetrievalService(this.address, this.leaderSessionID);
+			}
+		}
+
+		private MockJobMaster mockJobMaster(MockResourceManagerRuntimeServices rmServices, JobID jobID) {
+			MockJobMaster jm = new MockJobMaster(jobID);
+			rpcService.registerGateway(jm.address, jm.gateway);
+			rmServices.highAvailabilityServices.setJobMasterLeaderRetriever(jm.jobID, jm.leaderRetrievalService);
+			return jm;
+		}
+
+		static class MockTaskExecutor {
+			public final Protos.TaskID taskID;
+			public final String address;
+			public final ResourceID resourceID;
+			public final TaskExecutorGateway gateway;
+
+			public MockTaskExecutor(Protos.TaskID taskID) {
+				this.taskID = taskID;
+				this.address = "/" + taskID;
+				this.gateway = mock(TaskExecutorGateway.class);
+				this.resourceID = MesosResourceManager.extractResourceID(this.taskID);
+			}
+		}
+
+		private MockTaskExecutor mockTaskExecutor(Protos.TaskID taskID) {
+			MockTaskExecutor task = new MockTaskExecutor(taskID);
+			rpcService.registerGateway(task.address, task.gateway);
+			return task;
+		}
+
+		/**
+		 * Start the resource manager and grant leadership to it.
+		 */
+		public void startResourceManager() {
+			try {
+				resourceManager.start();
+				rmServices.grantLeadership();
+
+				// drain probe events
+				verify(rmServices.schedulerDriver).start();
+				resourceManager.connectionMonitor.expectMsgClass(ConnectionMonitor.Start.class);
+			} catch (Exception e) {
+				throw new RuntimeException("unable to initialize the RM", e);
+			}
+		}
+
+		/**
+		 * Register a job master with the RM.
+		 */
+		public void registerJobMaster(MockJobMaster jobMaster) throws Exception  {
+			Future<RegistrationResponse> registration = resourceManager.registerJobManager(
+				rmServices.rmLeaderSessionId, jobMaster.leaderSessionID, jobMaster.resourceID, jobMaster.address, jobMaster.jobID);
+			assertTrue(registration.get() instanceof JobMasterRegistrationSuccess);
+		}
+
+		/**
+		 * Allocate a worker using the RM.
+		 */
+		public MesosWorkerStore.Worker allocateWorker(Protos.TaskID taskID, ResourceProfile resourceProfile) throws Exception {
+			when(rmServices.workerStore.newTaskID()).thenReturn(taskID);
+			rmServices.rmActions.allocateResource(resourceProfile);
+			MesosWorkerStore.Worker expected = MesosWorkerStore.Worker.newWorker(taskID, resourceProfile);
+
+			// drain the probe messages
+			verify(rmServices.workerStore).putWorker(expected);
+			assertThat(resourceManager.workersInNew, hasEntry(extractResourceID(taskID), expected));
+			resourceManager.taskRouter.expectMsgClass(TaskMonitor.TaskGoalStateUpdated.class);
+			resourceManager.launchCoordinator.expectMsgClass(LaunchCoordinator.Launch.class);
+			return expected;
+		}
+
+		/**
+		 * Prepares a launch operation.
+         */
+		public Protos.Offer.Operation launch(Protos.TaskInfo... taskInfo) {
+			return Protos.Offer.Operation.newBuilder()
+				.setType(Protos.Offer.Operation.Type.LAUNCH)
+				.setLaunch(Protos.Offer.Operation.Launch.newBuilder().addAllTaskInfos(Arrays.asList(taskInfo))
+				).build();
+		}
+
+		@Override
+		public void close() throws Exception {
+			rpcService.stopService();
+		}
+	}
+
+	@Test
+	public void testInitialize() throws Exception {
+		new Context() {{
+			startResourceManager();
+			LOG.info("initialized");
+		}};
+	}
+
+	/**
+	 * Test recovery of persistent workers.
+	 */
+	@Test
+	public void testRecoverWorkers() throws Exception {
+		new Context() {{
+			// set the initial persistent state then initialize the RM
+			MesosWorkerStore.Worker worker1 = MesosWorkerStore.Worker.newWorker(task1);
+			MesosWorkerStore.Worker worker2 = MesosWorkerStore.Worker.newWorker(task2).launchWorker(slave1, slave1host);
+			MesosWorkerStore.Worker worker3 = MesosWorkerStore.Worker.newWorker(task3).launchWorker(slave1, slave1host).releaseWorker();
+			when(rmServices.workerStore.getFrameworkID()).thenReturn(Option.apply(framework1));
+			when(rmServices.workerStore.recoverWorkers()).thenReturn(Arrays.asList(worker1, worker2, worker3));
+			startResourceManager();
+
+			// verify that the internal state was updated, the task router was notified,
+			// and the launch coordinator was asked to launch a task.
+			// note: "new" workers are discarded
+			assertThat(resourceManager.workersInNew.entrySet(), empty());
+			assertThat(resourceManager.workersInLaunch, hasEntry(extractResourceID(task2), worker2));
+			assertThat(resourceManager.workersBeingReturned, hasEntry(extractResourceID(task3), worker3));
+			resourceManager.taskRouter.expectMsgClass(TaskMonitor.TaskGoalStateUpdated.class);
+			LaunchCoordinator.Assign actualAssign =
+				resourceManager.launchCoordinator.expectMsgClass(LaunchCoordinator.Assign.class);
+			assertThat(actualAssign.tasks(), hasSize(1));
+			assertThat(actualAssign.tasks().get(0).f0.getId(), equalTo(task2.getValue()));
+			assertThat(actualAssign.tasks().get(0).f1, equalTo(slave1host));
+			resourceManager.launchCoordinator.expectNoMsg();
+		}};
+	}
+
+	/**
+	 * Test request for new workers.
+	 */
+	@Test
+	public void testRequestNewWorkers() throws Exception {
+		new Context() {{
+			startResourceManager();
+
+			// allocate a worker
+			when(rmServices.workerStore.newTaskID()).thenReturn(task1).thenThrow(new AssertionFailedError());
+			rmServices.rmActions.allocateResource(resourceProfile1);
+
+			// verify that a new worker was persisted, the internal state was updated, the task router was notified,
+			// and the launch coordinator was asked to launch a task
+			MesosWorkerStore.Worker expected = MesosWorkerStore.Worker.newWorker(task1);
+			verify(rmServices.workerStore).putWorker(expected);
+			assertThat(resourceManager.workersInNew, hasEntry(extractResourceID(task1), expected));
+			resourceManager.taskRouter.expectMsgClass(TaskMonitor.TaskGoalStateUpdated.class);
+			resourceManager.launchCoordinator.expectMsgClass(LaunchCoordinator.Launch.class);
+		}};
+	}
+
+	/**
+	 * Test offer handling.
+	 */
+	@Test
+	public void testOfferHandling() throws Exception {
+		new Context() {{
+			startResourceManager();
+
+			// Verify that the RM forwards offers to the launch coordinator.
+			resourceManager.resourceOffers(new ResourceOffers(Collections.<Protos.Offer>emptyList()));
+			resourceManager.launchCoordinator.expectMsgClass(ResourceOffers.class);
+			resourceManager.offerRescinded(new OfferRescinded(offer1));
+			resourceManager.launchCoordinator.expectMsgClass(OfferRescinded.class);
+		}};
+	}
+
+	/**
+	 * Test offer acceptance.
+	 */
+	@Test
+	public void testAcceptOffers() throws Exception {
+		new Context() {{
+			startResourceManager();
+
+			// allocate a new worker
+			MesosWorkerStore.Worker worker1 = allocateWorker(task1, resourceProfile1);
+
+			// send an AcceptOffers message as the LaunchCoordinator would
+			// to launch task1 onto slave1 with offer1
+			Protos.TaskInfo task1info = Protos.TaskInfo.newBuilder()
+				.setTaskId(task1).setName("").setSlaveId(slave1).build();
+			AcceptOffers msg = new AcceptOffers(slave1host, singletonList(offer1), singletonList(launch(task1info)));
+			resourceManager.acceptOffers(msg);
+
+			// verify that the worker was persisted, the internal state was updated,
+			// Mesos was asked to launch task1, and the task router was notified
+			MesosWorkerStore.Worker worker1launched = worker1.launchWorker(slave1, slave1host);
+			verify(rmServices.workerStore).putWorker(worker1launched);
+			assertThat(resourceManager.workersInNew.entrySet(), empty());
+			assertThat(resourceManager.workersInLaunch, hasEntry(extractResourceID(task1), worker1launched));
+			resourceManager.taskRouter.expectMsg(
+				new TaskMonitor.TaskGoalStateUpdated(extractGoalState(worker1launched)));
+			verify(rmServices.schedulerDriver).acceptOffers(msg.offerIds(), msg.operations(), msg.filters());
+		}};
+	}
+
+	/**
+	 * Test status handling.
+	 */
+	@Test
+	public void testStatusHandling() throws Exception {
+		new Context() {{
+			startResourceManager();
+
+			// Verify that the RM forwards status updates to the launch coordinator and task router.
+			resourceManager.statusUpdate(new StatusUpdate(Protos.TaskStatus.newBuilder()
+				.setTaskId(task1).setSlaveId(slave1).setState(Protos.TaskState.TASK_LOST).build()));
+			resourceManager.reconciliationCoordinator.expectMsgClass(StatusUpdate.class);
+			resourceManager.taskRouter.expectMsgClass(StatusUpdate.class);
+		}};
+	}
+
+
+	/**
+	 * Test worker registration after launch.
+	 */
+	@Test
+	public void testWorkerStarted() throws Exception {
+		new Context() {{
+			// set the initial state with a (recovered) launched worker
+			MesosWorkerStore.Worker worker1launched = MesosWorkerStore.Worker.newWorker(task1).launchWorker(slave1, slave1host);
+			when(rmServices.workerStore.getFrameworkID()).thenReturn(Option.apply(framework1));
+			when(rmServices.workerStore.recoverWorkers()).thenReturn(singletonList(worker1launched));
+			startResourceManager();
+			assertThat(resourceManager.workersInLaunch, hasEntry(extractResourceID(task1), worker1launched));
+
+			// send registration message
+			Future<RegistrationResponse> successfulFuture =
+				resourceManager.registerTaskExecutor(rmServices.rmLeaderSessionId, task1Executor.address, task1Executor.resourceID, slotReport);
+			RegistrationResponse response = successfulFuture.get(5, TimeUnit.SECONDS);
+			assertTrue(response instanceof TaskExecutorRegistrationSuccess);
+
+			// verify the internal state
+			assertThat(resourceManager.workersInLaunch, hasEntry(extractResourceID(task1), worker1launched));
+		}};
+	}
+
+
+	/**
+	 * Test the planned release of registered workers.
+	 */
+	@Test
+	@Ignore
+	public void testReleaseRegisteredWorker() throws Exception {
+		// not supported by RM
+	}
+
+	/**
+	 * Test unplanned task failure of a pending worker.
+	 */
+	@Test
+	public void testWorkerFailed() throws Exception {
+		new Context() {{
+			// set the initial persistent state with a launched worker
+			MesosWorkerStore.Worker worker1launched = MesosWorkerStore.Worker.newWorker(task1).launchWorker(slave1, slave1host);
+			when(rmServices.workerStore.getFrameworkID()).thenReturn(Option.apply(framework1));
+			when(rmServices.workerStore.recoverWorkers()).thenReturn(singletonList(worker1launched));
+			startResourceManager();
+
+			// tell the RM that a task failed
+			when(rmServices.workerStore.removeWorker(task1)).thenReturn(true);
+			resourceManager.taskTerminated(new TaskMonitor.TaskTerminated(task1, Protos.TaskStatus.newBuilder()
+				.setTaskId(task1).setSlaveId(slave1).setState(Protos.TaskState.TASK_FAILED).build()));
+
+			// verify that the instance state was updated
+			verify(rmServices.workerStore).removeWorker(task1);
+			assertThat(resourceManager.workersInLaunch.entrySet(), empty());
+			assertThat(resourceManager.workersBeingReturned.entrySet(), empty());
+
+			// verify that `closeTaskManagerConnection` was called
+			assertThat(resourceManager.closedTaskManagerConnections, hasItem(extractResourceID(task1)));
+		}};
+	}
+
+	/**
+	 * Test application shutdown handling.
+	 */
+	@Test
+	public void testShutdownApplication() throws Exception {
+		new Context() {{
+			startResourceManager();
+			resourceManager.shutDownCluster(ApplicationStatus.SUCCEEDED, "");
+
+			// verify that the Mesos framework is shutdown
+			verify(rmServices.schedulerDriver).stop(false);
+			verify(rmServices.workerStore).stop(true);
+		}};
+	}
+
+	// ------------- connectivity tests -----------------------------
+
+	/**
+	 * Test Mesos registration handling.
+	 */
+	@Test
+	public void testRegistered() throws Exception {
+		new Context() {{
+			startResourceManager();
+
+			Protos.MasterInfo masterInfo = Protos.MasterInfo.newBuilder()
+				.setId("master1").setIp(0).setPort(5050).build();
+			resourceManager.registered(new Registered(framework1, masterInfo));
+
+			verify(rmServices.workerStore).setFrameworkID(Option.apply(framework1));
+			resourceManager.connectionMonitor.expectMsgClass(Registered.class);
+			resourceManager.reconciliationCoordinator.expectMsgClass(Registered.class);
+			resourceManager.launchCoordinator.expectMsgClass(Registered.class);
+			resourceManager.taskRouter.expectMsgClass(Registered.class);
+		}};
+	}
+
+
+	/**
+	 * Test Mesos re-registration handling.
+	 */
+	@Test
+	public void testReRegistered() throws Exception {
+		new Context() {{
+			when(rmServices.workerStore.getFrameworkID()).thenReturn(Option.apply(framework1));
+			startResourceManager();
+
+			Protos.MasterInfo masterInfo = Protos.MasterInfo.newBuilder()
+				.setId("master1").setIp(0).setPort(5050).build();
+			resourceManager.reregistered(new ReRegistered(masterInfo));
+
+			resourceManager.connectionMonitor.expectMsgClass(ReRegistered.class);
+			resourceManager.reconciliationCoordinator.expectMsgClass(ReRegistered.class);
+			resourceManager.launchCoordinator.expectMsgClass(ReRegistered.class);
+			resourceManager.taskRouter.expectMsgClass(ReRegistered.class);
+		}};
+	}
+
+	/**
+	 * Test Mesos re-registration handling.
+	 */
+	@Test
+	public void testDisconnected() throws Exception {
+		new Context() {{
+			when(rmServices.workerStore.getFrameworkID()).thenReturn(Option.apply(framework1));
+			startResourceManager();
+
+			resourceManager.disconnected(new Disconnected());
+
+			resourceManager.connectionMonitor.expectMsgClass(Disconnected.class);
+			resourceManager.reconciliationCoordinator.expectMsgClass(Disconnected.class);
+			resourceManager.launchCoordinator.expectMsgClass(Disconnected.class);
+			resourceManager.taskRouter.expectMsgClass(Disconnected.class);
+		}};
+	}
+
+	/**
+	 * Test Mesos scheduler error.
+	 */
+	@Test
+	public void testError() throws Exception {
+		new Context() {{
+			when(rmServices.workerStore.getFrameworkID()).thenReturn(Option.apply(framework1));
+			startResourceManager();
+			resourceManager.error(new Error("test"));
+			assertTrue(fatalErrorHandler.hasExceptionOccurred());
+		}};
+	}
+
+	@Test
+	public void testAdapter() throws Exception {
+		Protos.TaskID task1 = Protos.TaskID.newBuilder().setValue("taskmanager-00001").build();
+		Protos.TaskStatus status1 = Protos.TaskStatus.newBuilder().setTaskId(task1).setState(Protos.TaskState.TASK_KILLED).build();
+		String host1 = "host1";
+
+		MesosResourceManagerGateway gateway = mock(MesosResourceManagerGateway.class);
+		ActorRef adapter = system.actorOf(MesosResourceManager.AkkaAdapter.createActorProps(gateway));
+
+		List<Protos.TaskStatus> tasks = Collections.singletonList(status1);
+		ReconciliationCoordinator.Reconcile msg1 = new ReconciliationCoordinator.Reconcile(
+			scala.collection.JavaConverters.asScalaBufferConverter(tasks).asScala(), false);
+		adapter.tell(msg1, ActorRef.noSender());
+		verify(gateway).reconcile(eq(msg1));
+
+		TaskMonitor.TaskTerminated msg2 = new TaskMonitor.TaskTerminated(task1, status1);
+		adapter.tell(msg2, ActorRef.noSender());
+		verify(gateway).taskTerminated(eq(msg2));
+
+		AcceptOffers msg3 = new AcceptOffers(host1, Collections.<Protos.OfferID>emptyList(), Collections.<Protos.Offer.Operation>emptyList());
+		adapter.tell(msg3, ActorRef.noSender());
+		verify(gateway).acceptOffers(eq(msg3));
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/3fe27ac0/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index bef0aa3..92f007f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -83,8 +83,8 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  *     <li>{@link #requestSlot(UUID, UUID, SlotRequest)} requests a slot from the resource manager</li>
  * </ul>
  */
-public abstract class ResourceManager<WorkerType extends Serializable>
-		extends RpcEndpoint<ResourceManagerGateway>
+public abstract class ResourceManager<C extends ResourceManagerGateway, WorkerType extends Serializable>
+		extends RpcEndpoint<C>
 		implements LeaderContender {
 
 	public static final String RESOURCE_MANAGER_NAME = "resourcemanager";
@@ -419,6 +419,11 @@ public abstract class ResourceManager<WorkerType extends Serializable>
 						}
 
 						WorkerType newWorker = workerStarted(taskExecutorResourceId);
+						if(newWorker == null) {
+							log.warn("Discard registration from TaskExecutor {} at ({}) because the framework did " +
+									"not recognize it", taskExecutorResourceId, taskExecutorAddress);
+							return new RegistrationResponse.Decline("unrecognized TaskExecutor");
+						}
 						WorkerRegistration<WorkerType> registration =
 							new WorkerRegistration<>(taskExecutorGateway, newWorker);
 
@@ -783,7 +788,7 @@ public abstract class ResourceManager<WorkerType extends Serializable>
 	 *
 	 * @param t The exception describing the fatal error
 	 */
-	void onFatalError(Throwable t) {
+	protected void onFatalError(Throwable t) {
 		log.error("Fatal error occurred.", t);
 		fatalErrorHandler.onFatalError(t);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/3fe27ac0/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java
index d0c411c..12d3a7d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java
@@ -42,7 +42,7 @@ public class ResourceManagerRunner implements FatalErrorHandler {
 
 	private final ResourceManagerRuntimeServices resourceManagerRuntimeServices;
 
-	private final ResourceManager<?> resourceManager;
+	private final ResourceManager<? extends ResourceManagerGateway, ?> resourceManager;
 
 	public ResourceManagerRunner(
 			final ResourceID resourceId,

http://git-wip-us.apache.org/repos/asf/flink/blob/3fe27ac0/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java
index a921a29..afeddc4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java
@@ -36,7 +36,7 @@ import org.apache.flink.runtime.rpc.RpcService;
  *
  * This ResourceManager doesn't acquire new resources.
  */
-public class StandaloneResourceManager extends ResourceManager<ResourceID> {
+public class StandaloneResourceManager extends ResourceManager<StandaloneResourceManagerGateway, ResourceID> {
 
 	public StandaloneResourceManager(
 			RpcService rpcService,

http://git-wip-us.apache.org/repos/asf/flink/blob/3fe27ac0/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManagerGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManagerGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManagerGateway.java
new file mode 100644
index 0000000..6c8de66
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManagerGateway.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager;
+
+/**
+ * The {@link StandaloneResourceManager}'s RPC gateway interface.
+ */
+public interface StandaloneResourceManagerGateway extends ResourceManagerGateway {
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/3fe27ac0/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
index 40b9568..311fa49 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
@@ -51,7 +51,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  * of Erlang or Akka.
  *
  * <p>The RPC endpoint provides provides {@link #runAsync(Runnable)}, {@link #callAsync(Callable, Time)}
-  * and the {@link #getMainThreadExecutor()} to execute code in the RPC endoint's main thread.
+  * and the {@link #getMainThreadExecutor()} to execute code in the RPC endpoint's main thread.
  *
  * @param <C> The RPC gateway counterpart for the implementing RPC endpoint
  */

http://git-wip-us.apache.org/repos/asf/flink/blob/3fe27ac0/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
index 6a0bd87..85ed950 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
@@ -47,6 +47,7 @@ import org.apache.flink.runtime.resourcemanager.ResourceManager;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
 import org.apache.flink.runtime.resourcemanager.SlotRequest;
 import org.apache.flink.runtime.resourcemanager.StandaloneResourceManager;
+import org.apache.flink.runtime.resourcemanager.StandaloneResourceManagerGateway;
 import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
 import org.apache.flink.runtime.rpc.TestingSerialRpcService;
 import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
@@ -126,7 +127,7 @@ public class TaskExecutorITCase extends TestLogger {
 			TestingUtils.infiniteTime(),
 			TestingUtils.infiniteTime());
 
-		ResourceManager<ResourceID> resourceManager = new StandaloneResourceManager(
+		ResourceManager<StandaloneResourceManagerGateway,ResourceID> resourceManager = new StandaloneResourceManager(
 			rpcService,
 			FlinkResourceManager.RESOURCE_MANAGER_NAME,
 			rmResourceId,

http://git-wip-us.apache.org/repos/asf/flink/blob/3fe27ac0/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java
index 2ad9065..cef3378 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java
@@ -193,7 +193,7 @@ public class YarnFlinkApplicationMasterRunner extends AbstractYarnFlinkApplicati
 		return new AkkaRpcService(actorSystem, Time.of(duration.length(), duration.unit()));
 	}
 
-	private ResourceManager<?> createResourceManager(Configuration config) throws Exception {
+	private ResourceManager<?,?> createResourceManager(Configuration config) throws Exception {
 		final ResourceManagerConfiguration resourceManagerConfiguration = ResourceManagerConfiguration.fromConfiguration(config);
 		final ResourceManagerRuntimeServicesConfiguration resourceManagerRuntimeServicesConfiguration = ResourceManagerRuntimeServicesConfiguration.fromConfiguration(config);
 		final ResourceManagerRuntimeServices resourceManagerRuntimeServices = ResourceManagerRuntimeServices.fromConfiguration(

http://git-wip-us.apache.org/repos/asf/flink/blob/3fe27ac0/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
index 6099d18..4ee30f4 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
@@ -63,7 +63,7 @@ import scala.concurrent.duration.FiniteDuration;
  * The yarn implementation of the resource manager. Used when the system is started
  * via the resource framework YARN.
  */
-public class YarnResourceManager extends ResourceManager<ResourceID> implements AMRMClientAsync.CallbackHandler {
+public class YarnResourceManager extends ResourceManager<YarnResourceManagerGateway, ResourceID> implements AMRMClientAsync.CallbackHandler {
 
 	/** The process environment variables. */
 	private final Map<String, String> env;

http://git-wip-us.apache.org/repos/asf/flink/blob/3fe27ac0/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManagerGateway.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManagerGateway.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManagerGateway.java
new file mode 100644
index 0000000..485fb90
--- /dev/null
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManagerGateway.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
+
+/**
+ * The {@link YarnResourceManager}'s RPC gateway interface.
+ */
+public interface YarnResourceManagerGateway extends ResourceManagerGateway {
+}


[3/4] flink git commit: [FLINK-6379] [tests] Fix race condition in MesosResourceManagerTest

Posted by tr...@apache.org.
[FLINK-6379] [tests] Fix race condition in MesosResourceManagerTest

The MesosResourceManagerTest#testAdapter tests the AkkaAdapter class. The tests
are executed asynchronously and thus it is necessary to introduce timeouts for
the verify calls. This commit fixes the test instability by introducing timeouts.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4bb488c0
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4bb488c0
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4bb488c0

Branch: refs/heads/master
Commit: 4bb488c0f6673ef9099ae7adfc1ec5a9e125cdbf
Parents: 3fe27ac
Author: Till Rohrmann <tr...@apache.org>
Authored: Tue Jun 27 14:10:55 2017 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Thu Jun 29 08:41:33 2017 +0200

----------------------------------------------------------------------
 .../clusterframework/MesosResourceManager.java  |  89 ++++++-----
 .../MesosResourceManagerGateway.java            |  28 +++-
 .../store/MesosWorkerStore.java                 |  19 +--
 .../MesosResourceManagerTest.java               | 159 ++++++++-----------
 .../resourcemanager/ResourceManager.java        |  46 +++---
 5 files changed, 167 insertions(+), 174 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/4bb488c0/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
index 71bfacd..bb20060 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
@@ -68,6 +68,7 @@ import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcMethod;
 import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.util.Preconditions;
 import org.apache.mesos.Protos;
 import org.apache.mesos.SchedulerDriver;
 import org.slf4j.Logger;
@@ -80,8 +81,6 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-import static java.util.Objects.requireNonNull;
-
 /**
  * The Mesos implementation of the resource manager.
  */
@@ -121,7 +120,7 @@ public class MesosResourceManager extends ResourceManager<MesosResourceManagerGa
 
 	private ActorRef connectionMonitor;
 
-	private ActorRef taskRouter;
+	private ActorRef taskMonitor;
 
 	private ActorRef launchCoordinator;
 
@@ -164,20 +163,20 @@ public class MesosResourceManager extends ResourceManager<MesosResourceManagerGa
 			jobLeaderIdService,
 			fatalErrorHandler);
 
-		this.actorSystem = actorSystem;
+		this.actorSystem = Preconditions.checkNotNull(actorSystem);
 
-		this.flinkConfig = requireNonNull(flinkConfig);
-		this.mesosConfig = requireNonNull(mesosConfig);
+		this.flinkConfig = Preconditions.checkNotNull(flinkConfig);
+		this.mesosConfig = Preconditions.checkNotNull(mesosConfig);
 
-		this.workerStore = requireNonNull(workerStore);
-		this.artifactResolver = requireNonNull(artifactResolver);
+		this.workerStore = Preconditions.checkNotNull(workerStore);
+		this.artifactResolver = Preconditions.checkNotNull(artifactResolver);
 
-		this.taskManagerParameters = requireNonNull(taskManagerParameters);
-		this.taskManagerContainerSpec = requireNonNull(taskManagerContainerSpec);
+		this.taskManagerParameters = Preconditions.checkNotNull(taskManagerParameters);
+		this.taskManagerContainerSpec = Preconditions.checkNotNull(taskManagerContainerSpec);
 
-		this.workersInNew = new HashMap<>();
-		this.workersInLaunch = new HashMap<>();
-		this.workersBeingReturned = new HashMap<>();
+		this.workersInNew = new HashMap<>(8);
+		this.workersInLaunch = new HashMap<>(8);
+		this.workersBeingReturned = new HashMap<>(8);
 	}
 
 	protected ActorRef createSelfActor() {
@@ -257,7 +256,7 @@ public class MesosResourceManager extends ResourceManager<MesosResourceManagerGa
 		connectionMonitor = createConnectionMonitor();
 		launchCoordinator = createLaunchCoordinator();
 		reconciliationCoordinator = createReconciliationCoordinator();
-		taskRouter = createTaskRouter();
+		taskMonitor = createTaskRouter();
 
 		// recover state
 		try {
@@ -307,7 +306,7 @@ public class MesosResourceManager extends ResourceManager<MesosResourceManagerGa
 						workersBeingReturned.put(extractResourceID(worker.taskID()), worker);
 						break;
 				}
-				taskRouter.tell(new TaskMonitor.TaskGoalStateUpdated(extractGoalState(worker)), selfActor);
+				taskMonitor.tell(new TaskMonitor.TaskGoalStateUpdated(extractGoalState(worker)), selfActor);
 			}
 
 			// tell the launch coordinator about prior assignments
@@ -352,14 +351,14 @@ public class MesosResourceManager extends ResourceManager<MesosResourceManagerGa
 			LOG.info("Scheduling Mesos task {} with ({} MB, {} cpus).",
 				launchable.taskID().getValue(), launchable.taskRequest().getMemory(), launchable.taskRequest().getCPUs());
 
-			// tell the task router about the new plans
-			taskRouter.tell(new TaskMonitor.TaskGoalStateUpdated(extractGoalState(worker)), selfActor);
+			// tell the task monitor about the new plans
+			taskMonitor.tell(new TaskMonitor.TaskGoalStateUpdated(extractGoalState(worker)), selfActor);
 
 			// tell the launch coordinator to launch the new tasks
 			launchCoordinator.tell(new LaunchCoordinator.Launch(Collections.singletonList((LaunchableTask) launchable)), selfActor);
 		}
 		catch(Exception ex) {
-			onFatalErrorAsync(new ResourceManagerException("unable to request new workers", ex));
+			onFatalErrorAsync(new ResourceManagerException("Unable to request new workers.", ex));
 		}
 	}
 
@@ -370,6 +369,7 @@ public class MesosResourceManager extends ResourceManager<MesosResourceManagerGa
 
 	/**
 	 * Callback when a worker was started.
+	 *
 	 * @param resourceID The worker resource id (as provided by the TaskExecutor)
 	 */
 	@Override
@@ -379,11 +379,11 @@ public class MesosResourceManager extends ResourceManager<MesosResourceManagerGa
 		MesosWorkerStore.Worker inLaunch = workersInLaunch.get(resourceID);
 		if (inLaunch != null) {
 			return new RegisteredMesosWorkerNode(inLaunch);
+		} else {
+			// the worker is unrecognized or was already released
+			// return null to indicate that TaskExecutor registration should be declined
+			return null;
 		}
-
-		// the worker is unrecognized or was already released
-		// return null to indicate that TaskExecutor registration should be declined
-		return null;
 	}
 
 	// ------------------------------------------------------------------------
@@ -397,12 +397,13 @@ public class MesosResourceManager extends ResourceManager<MesosResourceManagerGa
 			workerStore.setFrameworkID(Option.apply(message.frameworkId()));
 		}
 		catch(Exception ex) {
-			onFatalError(new ResourceManagerException("unable to store the assigned framework ID", ex));
+			onFatalError(new ResourceManagerException("Unable to store the assigned framework ID.", ex));
 			return;
 		}
+
 		launchCoordinator.tell(message, selfActor);
 		reconciliationCoordinator.tell(message, selfActor);
-		taskRouter.tell(message, selfActor);
+		taskMonitor.tell(message, selfActor);
 	}
 
 	/**
@@ -413,7 +414,7 @@ public class MesosResourceManager extends ResourceManager<MesosResourceManagerGa
 		connectionMonitor.tell(message, selfActor);
 		launchCoordinator.tell(message, selfActor);
 		reconciliationCoordinator.tell(message, selfActor);
-		taskRouter.tell(message, selfActor);
+		taskMonitor.tell(message, selfActor);
 	}
 
 	/**
@@ -424,7 +425,7 @@ public class MesosResourceManager extends ResourceManager<MesosResourceManagerGa
 		connectionMonitor.tell(message, selfActor);
 		launchCoordinator.tell(message, selfActor);
 		reconciliationCoordinator.tell(message, selfActor);
-		taskRouter.tell(message, selfActor);
+		taskMonitor.tell(message, selfActor);
 	}
 
 	/**
@@ -456,27 +457,26 @@ public class MesosResourceManager extends ResourceManager<MesosResourceManagerGa
 
 			// transition the persistent state of some tasks to Launched
 			for (Protos.Offer.Operation op : msg.operations()) {
-				if (op.getType() != Protos.Offer.Operation.Type.LAUNCH) {
-					continue;
-				}
-				for (Protos.TaskInfo info : op.getLaunch().getTaskInfosList()) {
-					MesosWorkerStore.Worker worker = workersInNew.remove(extractResourceID(info.getTaskId()));
-					assert (worker != null);
+				if (op.getType() == Protos.Offer.Operation.Type.LAUNCH) {
+					for (Protos.TaskInfo info : op.getLaunch().getTaskInfosList()) {
+						MesosWorkerStore.Worker worker = workersInNew.remove(extractResourceID(info.getTaskId()));
+						assert (worker != null);
 
-					worker = worker.launchWorker(info.getSlaveId(), msg.hostname());
-					workerStore.putWorker(worker);
-					workersInLaunch.put(extractResourceID(worker.taskID()), worker);
+						worker = worker.launchWorker(info.getSlaveId(), msg.hostname());
+						workerStore.putWorker(worker);
+						workersInLaunch.put(extractResourceID(worker.taskID()), worker);
 
-					LOG.info("Launching Mesos task {} on host {}.",
-						worker.taskID().getValue(), worker.hostname().get());
+						LOG.info("Launching Mesos task {} on host {}.",
+							worker.taskID().getValue(), worker.hostname().get());
 
-					toMonitor.add(new TaskMonitor.TaskGoalStateUpdated(extractGoalState(worker)));
+						toMonitor.add(new TaskMonitor.TaskGoalStateUpdated(extractGoalState(worker)));
+					}
 				}
 			}
 
-			// tell the task router about the new plans
+			// tell the task monitor about the new plans
 			for (TaskMonitor.TaskGoalStateUpdated update : toMonitor) {
-				taskRouter.tell(update, selfActor);
+				taskMonitor.tell(update, selfActor);
 			}
 
 			// send the acceptance message to Mesos
@@ -492,7 +492,7 @@ public class MesosResourceManager extends ResourceManager<MesosResourceManagerGa
 	 */
 	@RpcMethod
 	public void statusUpdate(StatusUpdate message) {
-		taskRouter.tell(message, selfActor);
+		taskMonitor.tell(message, selfActor);
 		reconciliationCoordinator.tell(message, selfActor);
 		schedulerDriver.acknowledgeStatusUpdate(message.status());
 	}
@@ -541,8 +541,8 @@ public class MesosResourceManager extends ResourceManager<MesosResourceManagerGa
 			// failed worker, either at startup, or running
 			final MesosWorkerStore.Worker launched = workersInLaunch.remove(id);
 			assert(launched != null);
-			LOG.info("Worker {} failed with status: {}, reason: {}, message: {}. " +
-				"State: {} Reason: {} ({})", id, status.getState(), status.getReason(), status.getMessage());
+			LOG.info("Worker {} failed with status: {}, reason: {}, message: {}.",
+				id, status.getState(), status.getReason(), status.getMessage());
 
 			// TODO : launch a replacement worker?
 		}
@@ -578,7 +578,7 @@ public class MesosResourceManager extends ResourceManager<MesosResourceManagerGa
 
 		// create the specific TM parameters from the resource profile and some defaults
 		MesosTaskManagerParameters params = new MesosTaskManagerParameters(
-			resourceProfile.getCpuCores() < 1 ? taskManagerParameters.cpus() : resourceProfile.getCpuCores(),
+			resourceProfile.getCpuCores() < 1.0 ? taskManagerParameters.cpus() : resourceProfile.getCpuCores(),
 			taskManagerParameters.containerType(),
 			taskManagerParameters.containerImageName(),
 			new ContaineredTaskManagerParameters(
@@ -616,6 +616,7 @@ public class MesosResourceManager extends ResourceManager<MesosResourceManagerGa
 
 	/**
 	 * Extracts the Mesos task goal state from the worker information.
+	 *
 	 * @param worker the persistent worker information.
 	 * @return goal state information for the {@Link TaskMonitor}.
 	 */

http://git-wip-us.apache.org/repos/asf/flink/blob/4bb488c0/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerGateway.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerGateway.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerGateway.java
index e353dcc..70ed47d 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerGateway.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerGateway.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.mesos.runtime.clusterframework;
 
+import org.apache.flink.mesos.scheduler.LaunchCoordinator;
 import org.apache.flink.mesos.scheduler.ReconciliationCoordinator;
 import org.apache.flink.mesos.scheduler.SchedulerGateway;
 import org.apache.flink.mesos.scheduler.TaskMonitor;
@@ -29,9 +30,30 @@ import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
  */
 public interface MesosResourceManagerGateway extends ResourceManagerGateway, SchedulerGateway {
 
-	void acceptOffers(AcceptOffers msg);
+	/**
+	 * Accept the given offers as advised by the launch coordinator.
+	 *
+	 * Note: This method is a callback for the {@link LaunchCoordinator}.
+	 *
+	 * @param offersToAccept Offers to accept from Mesos
+	 */
+	void acceptOffers(AcceptOffers offersToAccept);
 
-	void reconcile(ReconciliationCoordinator.Reconcile message);
+	/**
+	 * Trigger reconciliation with the Mesos master.
+	 *
+	 * Note: This method is a callback for the {@link TaskMonitor}.
+	 *
+	 * @param reconciliationRequest Message containing the tasks which shall be reconciled
+	 */
+	void reconcile(ReconciliationCoordinator.Reconcile reconciliationRequest);
 
-	void taskTerminated(TaskMonitor.TaskTerminated message);
+	/**
+	 * Notify that the given Mesos task has been terminated.
+	 *
+	 * Note: This method is a callback for the {@link TaskMonitor}.
+	 *
+	 * @param terminatedTask Message containing the terminated task
+	 */
+	void taskTerminated(TaskMonitor.TaskTerminated terminatedTask);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/4bb488c0/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/MesosWorkerStore.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/MesosWorkerStore.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/MesosWorkerStore.java
index e76ff63..a9ec892 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/MesosWorkerStore.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/MesosWorkerStore.java
@@ -104,17 +104,11 @@ public interface MesosWorkerStore {
 
 		private Worker(Protos.TaskID taskID, ResourceProfile profile,
 				Option<Protos.SlaveID> slaveID, Option<String> hostname, WorkerState state) {
-			requireNonNull(taskID, "taskID");
-			requireNonNull(profile, "profile");
-			requireNonNull(slaveID, "slaveID");
-			requireNonNull(hostname, "hostname");
-			requireNonNull(state, "state");
-
-			this.taskID = taskID;
-			this.profile = profile;
-			this.slaveID = slaveID;
-			this.hostname = hostname;
-			this.state = state;
+			this.taskID = requireNonNull(taskID, "taskID");
+			this.profile = requireNonNull(profile, "profile");
+			this.slaveID = requireNonNull(slaveID, "slaveID");
+			this.hostname = requireNonNull(hostname, "hostname");
+			this.state = requireNonNull(state, "state");
 		}
 
 		/**
@@ -207,12 +201,13 @@ public interface MesosWorkerStore {
 			return Objects.equals(taskID, worker.taskID) &&
 				Objects.equals(slaveID, worker.slaveID) &&
 				Objects.equals(hostname, worker.hostname) &&
+				Objects.equals(profile, worker.profile) &&
 				state == worker.state;
 		}
 
 		@Override
 		public int hashCode() {
-			return Objects.hash(taskID, slaveID, hostname, state);
+			return Objects.hash(taskID, slaveID, hostname, state, profile);
 		}
 
 		@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/4bb488c0/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java
index 0c715ba..929e927 100644
--- a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java
+++ b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java
@@ -75,6 +75,7 @@ import org.mockito.stubbing.Answer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.Option;
+import scala.collection.JavaConverters;
 
 import java.util.*;
 import java.util.concurrent.Executor;
@@ -134,6 +135,7 @@ public class MesosResourceManagerTest extends TestLogger {
 			MetricRegistry metricRegistry,
 			JobLeaderIdService jobLeaderIdService,
 			FatalErrorHandler fatalErrorHandler,
+
 			// Mesos specifics
 			ActorSystem actorSystem,
 			Configuration flinkConfig,
@@ -182,7 +184,6 @@ public class MesosResourceManagerTest extends TestLogger {
 
 		// domain objects for test purposes
 		final ResourceProfile resourceProfile1 = new ResourceProfile(1.0, 1);
-		final ResourceProfile resourceProfile2 = new ResourceProfile(2.0, 2);
 
 		Protos.FrameworkID framework1 = Protos.FrameworkID.newBuilder().setValue("framework1").build();
 		public Protos.SlaveID slave1 = Protos.SlaveID.newBuilder().setValue("slave1").build();
@@ -204,59 +205,54 @@ public class MesosResourceManagerTest extends TestLogger {
 		/**
 		 * Create mock RM dependencies.
 		 */
-		public Context() {
-			try {
-				rpcService = new TestingSerialRpcService();
-				fatalErrorHandler = new TestingFatalErrorHandler();
-				rmServices = new MockMesosResourceManagerRuntimeServices();
-
-				// TaskExecutor templating
-				ContainerSpecification containerSpecification = new ContainerSpecification();
-				ContaineredTaskManagerParameters containeredParams =
-					new ContaineredTaskManagerParameters(1024, 768, 256, 4, new HashMap<String, String>());
-				MesosTaskManagerParameters tmParams = new MesosTaskManagerParameters(
-					1.0, MesosTaskManagerParameters.ContainerType.MESOS, Option.<String>empty(), containeredParams,
-					Collections.<Protos.Volume>emptyList(), Collections.<ConstraintEvaluator>emptyList(), Option.<String>empty(),
-					Option.<String>empty());
-
-				// resource manager
-				rmConfiguration = new ResourceManagerConfiguration(
-					Time.seconds(5L),
-					Time.seconds(5L));
-				rmResourceID = ResourceID.generate();
-				resourceManager =
-					new TestingMesosResourceManager(
-						rpcService,
-						rmAddress,
-						rmResourceID,
-						rmConfiguration,
-						rmServices.highAvailabilityServices,
-						rmServices.heartbeatServices,
-						rmServices.slotManager,
-						rmServices.metricRegistry,
-						rmServices.jobLeaderIdService,
-						fatalErrorHandler,
-						// Mesos specifics
-						system,
-						flinkConfig,
-						rmServices.mesosConfig,
-						rmServices.workerStore,
-						tmParams,
-						containerSpecification,
-						rmServices.artifactResolver
-					);
-
-				// TaskExecutors
-				task1Executor = mockTaskExecutor(task1);
-				task2Executor = mockTaskExecutor(task2);
-				task3Executor = mockTaskExecutor(task3);
-
-				// JobMaster
-				jobMaster1 = mockJobMaster(rmServices, new JobID(1,0));
-
-			} catch (Exception ex) {
-				throw new RuntimeException(ex);
-			}
+		Context() throws Exception {
+			rpcService = new TestingSerialRpcService();
+			fatalErrorHandler = new TestingFatalErrorHandler();
+			rmServices = new MockMesosResourceManagerRuntimeServices();
+
+			// TaskExecutor templating
+			ContainerSpecification containerSpecification = new ContainerSpecification();
+			ContaineredTaskManagerParameters containeredParams =
+				new ContaineredTaskManagerParameters(1024, 768, 256, 4, new HashMap<String, String>());
+			MesosTaskManagerParameters tmParams = new MesosTaskManagerParameters(
+				1.0, MesosTaskManagerParameters.ContainerType.MESOS, Option.<String>empty(), containeredParams,
+				Collections.<Protos.Volume>emptyList(), Collections.<ConstraintEvaluator>emptyList(), Option.<String>empty(),
+				Option.<String>empty());
+
+			// resource manager
+			rmConfiguration = new ResourceManagerConfiguration(
+				Time.seconds(5L),
+				Time.seconds(5L));
+			rmResourceID = ResourceID.generate();
+			resourceManager =
+				new TestingMesosResourceManager(
+					rpcService,
+					rmAddress,
+					rmResourceID,
+					rmConfiguration,
+					rmServices.highAvailabilityServices,
+					rmServices.heartbeatServices,
+					rmServices.slotManager,
+					rmServices.metricRegistry,
+					rmServices.jobLeaderIdService,
+					fatalErrorHandler,
+					// Mesos specifics
+					system,
+					flinkConfig,
+					rmServices.mesosConfig,
+					rmServices.workerStore,
+					tmParams,
+					containerSpecification,
+					rmServices.artifactResolver
+				);
+
+			// TaskExecutors
+			task1Executor = mockTaskExecutor(task1);
+			task2Executor = mockTaskExecutor(task2);
+			task3Executor = mockTaskExecutor(task3);
+
+			// JobMaster
+			jobMaster1 = mockJobMaster(rmServices, new JobID(1,0));
 		}
 
 		/**
@@ -275,7 +271,7 @@ public class MesosResourceManagerTest extends TestLogger {
 
 			public UUID rmLeaderSessionId;
 
-			public MockResourceManagerRuntimeServices() throws Exception {
+			MockResourceManagerRuntimeServices() throws Exception {
 				scheduledExecutor = mock(ScheduledExecutor.class);
 				highAvailabilityServices = new TestingHighAvailabilityServices();
 				rmLeaderElectionService = new TestingLeaderElectionService();
@@ -297,14 +293,6 @@ public class MesosResourceManagerTest extends TestLogger {
 				}).when(slotManager).start(any(UUID.class),any(Executor.class),any(ResourceManagerActions.class));
 
 				when(slotManager.registerSlotRequest(any(SlotRequest.class))).thenReturn(true);
-
-//				when(slotManager.registerSlotRequest(any(SlotRequest.class))).thenAnswer(new Answer<Object>() {
-//					@Override
-//					public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
-//						SlotRequest request = invocationOnMock.getArgumentAt(0, SlotRequest.class);
-//						return new RMSlotRequestRegistered(request.getAllocationId());
-//					}
-//				});
 			}
 
 			public void grantLeadership() {
@@ -319,8 +307,7 @@ public class MesosResourceManagerTest extends TestLogger {
 			public MesosWorkerStore workerStore;
 			public MesosArtifactResolver artifactResolver;
 
-			public MockMesosResourceManagerRuntimeServices() throws Exception {
-				super();
+			MockMesosResourceManagerRuntimeServices() throws Exception {
 				schedulerDriver = mock(SchedulerDriver.class);
 
 				mesosConfig = mock(MesosConfiguration.class);
@@ -343,7 +330,7 @@ public class MesosResourceManagerTest extends TestLogger {
 			public final UUID leaderSessionID;
 			public final TestingLeaderRetrievalService leaderRetrievalService;
 
-			public MockJobMaster(JobID jobID) {
+			MockJobMaster(JobID jobID) {
 				this.jobID = jobID;
 				this.resourceID = new ResourceID(jobID.toString());
 				this.address = "/" + jobID;
@@ -366,7 +353,7 @@ public class MesosResourceManagerTest extends TestLogger {
 			public final ResourceID resourceID;
 			public final TaskExecutorGateway gateway;
 
-			public MockTaskExecutor(Protos.TaskID taskID) {
+			MockTaskExecutor(Protos.TaskID taskID) {
 				this.taskID = taskID;
 				this.address = "/" + taskID;
 				this.gateway = mock(TaskExecutorGateway.class);
@@ -383,17 +370,13 @@ public class MesosResourceManagerTest extends TestLogger {
 		/**
 		 * Start the resource manager and grant leadership to it.
 		 */
-		public void startResourceManager() {
-			try {
-				resourceManager.start();
-				rmServices.grantLeadership();
-
-				// drain probe events
-				verify(rmServices.schedulerDriver).start();
-				resourceManager.connectionMonitor.expectMsgClass(ConnectionMonitor.Start.class);
-			} catch (Exception e) {
-				throw new RuntimeException("unable to initialize the RM", e);
-			}
+		public void startResourceManager() throws Exception {
+			resourceManager.start();
+			rmServices.grantLeadership();
+
+			// drain probe events
+			verify(rmServices.schedulerDriver).start();
+			resourceManager.connectionMonitor.expectMsgClass(ConnectionMonitor.Start.class);
 		}
 
 		/**
@@ -489,7 +472,7 @@ public class MesosResourceManagerTest extends TestLogger {
 
 			// verify that a new worker was persisted, the internal state was updated, the task router was notified,
 			// and the launch coordinator was asked to launch a task
-			MesosWorkerStore.Worker expected = MesosWorkerStore.Worker.newWorker(task1);
+			MesosWorkerStore.Worker expected = MesosWorkerStore.Worker.newWorker(task1, resourceProfile1);
 			verify(rmServices.workerStore).putWorker(expected);
 			assertThat(resourceManager.workersInNew, hasEntry(extractResourceID(task1), expected));
 			resourceManager.taskRouter.expectMsgClass(TaskMonitor.TaskGoalStateUpdated.class);
@@ -584,16 +567,6 @@ public class MesosResourceManagerTest extends TestLogger {
 		}};
 	}
 
-
-	/**
-	 * Test the planned release of registered workers.
-	 */
-	@Test
-	@Ignore
-	public void testReleaseRegisteredWorker() throws Exception {
-		// not supported by RM
-	}
-
 	/**
 	 * Test unplanned task failure of a pending worker.
 	 */
@@ -721,16 +694,16 @@ public class MesosResourceManagerTest extends TestLogger {
 
 		List<Protos.TaskStatus> tasks = Collections.singletonList(status1);
 		ReconciliationCoordinator.Reconcile msg1 = new ReconciliationCoordinator.Reconcile(
-			scala.collection.JavaConverters.asScalaBufferConverter(tasks).asScala(), false);
+			JavaConverters.asScalaBufferConverter(tasks).asScala(), false);
 		adapter.tell(msg1, ActorRef.noSender());
-		verify(gateway).reconcile(eq(msg1));
+		verify(gateway, timeout(1000L)).reconcile(eq(msg1));
 
 		TaskMonitor.TaskTerminated msg2 = new TaskMonitor.TaskTerminated(task1, status1);
 		adapter.tell(msg2, ActorRef.noSender());
-		verify(gateway).taskTerminated(eq(msg2));
+		verify(gateway, timeout(1000L)).taskTerminated(eq(msg2));
 
 		AcceptOffers msg3 = new AcceptOffers(host1, Collections.<Protos.OfferID>emptyList(), Collections.<Protos.Offer.Operation>emptyList());
 		adapter.tell(msg3, ActorRef.noSender());
-		verify(gateway).acceptOffers(eq(msg3));
+		verify(gateway, timeout(1000L)).acceptOffers(eq(msg3));
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/4bb488c0/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index 92f007f..df452e3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -79,7 +79,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  *
  * It offers the following methods as part of its rpc interface to interact with him remotely:
  * <ul>
- *     <li>{@link #registerJobManager(UUID, UUID, String, JobID, ResourceID)} registers a {@link JobMaster} at the resource manager</li>
+ *     <li>{@link #registerJobManager(UUID, UUID, ResourceID, String, JobID)} registers a {@link JobMaster} at the resource manager</li>
  *     <li>{@link #requestSlot(UUID, UUID, SlotRequest)} requests a slot from the resource manager</li>
  * </ul>
  */
@@ -418,36 +418,38 @@ public abstract class ResourceManager<C extends ResourceManagerGateway, WorkerTy
 							slotManager.unregisterTaskManager(oldRegistration.getInstanceID());
 						}
 
-						WorkerType newWorker = workerStarted(taskExecutorResourceId);
+						final WorkerType newWorker = workerStarted(taskExecutorResourceId);
+
 						if(newWorker == null) {
 							log.warn("Discard registration from TaskExecutor {} at ({}) because the framework did " +
 									"not recognize it", taskExecutorResourceId, taskExecutorAddress);
 							return new RegistrationResponse.Decline("unrecognized TaskExecutor");
-						}
-						WorkerRegistration<WorkerType> registration =
-							new WorkerRegistration<>(taskExecutorGateway, newWorker);
+						} else {
+							WorkerRegistration<WorkerType> registration =
+								new WorkerRegistration<>(taskExecutorGateway, newWorker);
 
-						taskExecutors.put(taskExecutorResourceId, registration);
+							taskExecutors.put(taskExecutorResourceId, registration);
 
-						slotManager.registerTaskManager(registration, slotReport);
+							slotManager.registerTaskManager(registration, slotReport);
 
-						taskManagerHeartbeatManager.monitorTarget(taskExecutorResourceId, new HeartbeatTarget<Void>() {
-							@Override
-							public void receiveHeartbeat(ResourceID resourceID, Void payload) {
-								// the ResourceManager will always send heartbeat requests to the
-								// TaskManager
-							}
+							taskManagerHeartbeatManager.monitorTarget(taskExecutorResourceId, new HeartbeatTarget<Void>() {
+								@Override
+								public void receiveHeartbeat(ResourceID resourceID, Void payload) {
+									// the ResourceManager will always send heartbeat requests to the
+									// TaskManager
+								}
 
-							@Override
-							public void requestHeartbeat(ResourceID resourceID, Void payload) {
-								taskExecutorGateway.heartbeatFromResourceManager(resourceID);
-							}
-						});
+								@Override
+								public void requestHeartbeat(ResourceID resourceID, Void payload) {
+									taskExecutorGateway.heartbeatFromResourceManager(resourceID);
+								}
+							});
 
-						return new TaskExecutorRegistrationSuccess(
-							registration.getInstanceID(),
-							resourceId,
-							resourceManagerConfiguration.getHeartbeatInterval().toMilliseconds());
+							return new TaskExecutorRegistrationSuccess(
+								registration.getInstanceID(),
+								resourceId,
+								resourceManagerConfiguration.getHeartbeatInterval().toMilliseconds());
+						}
 					}
 				}
 			}, getMainThreadExecutor());