You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/06/29 06:42:00 UTC

[4/4] flink git commit: [FLINK-6379] Remove internal methods from MesosResourceManagerGateway

[FLINK-6379] Remove internal methods from MesosResourceManagerGateway

Some internal methods which are required for the interplay between the TaskMonitor,
LaunchCoordinator and the MesosResourceManager were exposed as RPC methods. In order
to keep the RPC interface as lean as possible, these methods have been removed.

Fix checkstyle violations


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/901a9caf
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/901a9caf
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/901a9caf

Branch: refs/heads/master
Commit: 901a9caf4cf49b6cd2f18f0d6f04045312e96d84
Parents: 4bb488c
Author: Till Rohrmann <tr...@apache.org>
Authored: Tue Jun 27 15:04:17 2017 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Thu Jun 29 08:41:34 2017 +0200

----------------------------------------------------------------------
 .../clusterframework/MesosResourceManager.java  | 328 ++++++++++++-------
 .../MesosResourceManagerActions.java            |  60 ++++
 .../MesosResourceManagerGateway.java            |  59 ----
 .../store/MesosWorkerStore.java                 |   1 +
 .../flink/mesos/scheduler/SchedulerGateway.java |  89 -----
 .../flink/mesos/scheduler/SchedulerProxyV2.java | 103 ------
 .../MesosResourceManagerTest.java               | 118 ++++---
 .../resourcemanager/ResourceManager.java        |  16 +-
 .../resourcemanager/ResourceManagerRunner.java  |   2 +-
 .../StandaloneResourceManager.java              |   2 +-
 .../StandaloneResourceManagerGateway.java       |  25 --
 .../taskexecutor/TaskExecutorITCase.java        |   3 +-
 .../yarn/YarnFlinkApplicationMasterRunner.java  |   2 +-
 .../apache/flink/yarn/YarnResourceManager.java  |   2 +-
 .../flink/yarn/YarnResourceManagerGateway.java  |  27 --
 15 files changed, 341 insertions(+), 496 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/901a9caf/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
index bb20060..43b58bc 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
@@ -18,15 +18,6 @@
 
 package org.apache.flink.mesos.runtime.clusterframework;
 
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import akka.actor.Props;
-import akka.actor.UntypedActor;
-import com.netflix.fenzo.TaskRequest;
-import com.netflix.fenzo.TaskScheduler;
-import com.netflix.fenzo.VirtualMachineLease;
-import com.netflix.fenzo.functions.Action1;
-import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore;
@@ -34,13 +25,11 @@ import org.apache.flink.mesos.scheduler.ConnectionMonitor;
 import org.apache.flink.mesos.scheduler.LaunchCoordinator;
 import org.apache.flink.mesos.scheduler.LaunchableTask;
 import org.apache.flink.mesos.scheduler.ReconciliationCoordinator;
-import org.apache.flink.mesos.scheduler.SchedulerProxyV2;
 import org.apache.flink.mesos.scheduler.TaskMonitor;
 import org.apache.flink.mesos.scheduler.TaskSchedulerBuilder;
 import org.apache.flink.mesos.scheduler.Tasks;
 import org.apache.flink.mesos.scheduler.messages.AcceptOffers;
 import org.apache.flink.mesos.scheduler.messages.Disconnected;
-import org.apache.flink.mesos.scheduler.messages.Error;
 import org.apache.flink.mesos.scheduler.messages.ExecutorLost;
 import org.apache.flink.mesos.scheduler.messages.FrameworkMessage;
 import org.apache.flink.mesos.scheduler.messages.OfferRescinded;
@@ -66,14 +55,23 @@ import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
 import org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException;
 import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
-import org.apache.flink.runtime.rpc.RpcMethod;
 import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.Preconditions;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.Props;
+import akka.actor.UntypedActor;
+import com.netflix.fenzo.TaskRequest;
+import com.netflix.fenzo.TaskScheduler;
+import com.netflix.fenzo.VirtualMachineLease;
+import com.netflix.fenzo.functions.Action1;
 import org.apache.mesos.Protos;
+import org.apache.mesos.Scheduler;
 import org.apache.mesos.SchedulerDriver;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import scala.Option;
 
 import java.util.ArrayList;
 import java.util.Collections;
@@ -81,42 +79,40 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import scala.Option;
+
 /**
  * The Mesos implementation of the resource manager.
  */
-public class MesosResourceManager extends ResourceManager<MesosResourceManagerGateway, RegisteredMesosWorkerNode> {
+public class MesosResourceManager extends ResourceManager<RegisteredMesosWorkerNode> {
 	protected static final Logger LOG = LoggerFactory.getLogger(MesosResourceManager.class);
 
-	/** The Flink configuration */
+	/** The Flink configuration. */
 	private final Configuration flinkConfig;
 
-	/** The Mesos configuration (master and framework info) */
+	/** The Mesos configuration (master and framework info). */
 	private final MesosConfiguration mesosConfig;
 
-	/** The TaskManager container parameters (like container memory size) */
+	/** The TaskManager container parameters (like container memory size). */
 	private final MesosTaskManagerParameters taskManagerParameters;
 
-	/** Container specification for launching a TM */
+	/** Container specification for launching a TM. */
 	private final ContainerSpecification taskManagerContainerSpec;
 
-	/** Resolver for HTTP artifacts */
+	/** Resolver for HTTP artifacts. */
 	private final MesosArtifactResolver artifactResolver;
 
-	/** Persistent storage of allocated containers */
+	/** Persistent storage of allocated containers. */
 	private final MesosWorkerStore workerStore;
 
-	/** A local actor system for using the helper actors */
+	/** A local actor system for using the helper actors. */
 	private final ActorSystem actorSystem;
 
-	/** Callback handler for the asynchronous Mesos scheduler */
-	private SchedulerProxyV2 schedulerCallbackHandler;
-
-	/** Mesos scheduler driver */
+	/** Mesos scheduler driver. */
 	private SchedulerDriver schedulerDriver;
 
-	/** an adapter to receive messages from Akka actors */
-	@VisibleForTesting
-	ActorRef selfActor;
+	/** an adapter to receive messages from Akka actors. */
+	private ActorRef selfActor;
 
 	private ActorRef connectionMonitor;
 
@@ -126,7 +122,7 @@ public class MesosResourceManager extends ResourceManager<MesosResourceManagerGa
 
 	private ActorRef reconciliationCoordinator;
 
-	/** planning state related to workers - package private for unit test purposes */
+	/** planning state related to workers - package private for unit test purposes. */
 	final Map<ResourceID, MesosWorkerStore.Worker> workersInNew;
 	final Map<ResourceID, MesosWorkerStore.Worker> workersInLaunch;
 	final Map<ResourceID, MesosWorkerStore.Worker> workersBeingReturned;
@@ -181,7 +177,8 @@ public class MesosResourceManager extends ResourceManager<MesosResourceManagerGa
 
 	protected ActorRef createSelfActor() {
 		return actorSystem.actorOf(
-			AkkaAdapter.createActorProps(getSelf()),"ResourceManager");
+			Props.create(AkkaAdapter.class, this),
+			"ResourceManager");
 	}
 
 	protected ActorRef createConnectionMonitor() {
@@ -190,19 +187,21 @@ public class MesosResourceManager extends ResourceManager<MesosResourceManagerGa
 			"connectionMonitor");
 	}
 
-	protected ActorRef createTaskRouter() {
+	protected ActorRef createTaskMonitor(SchedulerDriver schedulerDriver) {
 		return actorSystem.actorOf(
 			Tasks.createActorProps(Tasks.class, flinkConfig, schedulerDriver, TaskMonitor.class),
 			"tasks");
 	}
 
-	protected ActorRef createLaunchCoordinator() {
+	protected ActorRef createLaunchCoordinator(
+			SchedulerDriver schedulerDriver,
+			ActorRef selfActor) {
 		return actorSystem.actorOf(
 			LaunchCoordinator.createActorProps(LaunchCoordinator.class, selfActor, flinkConfig, schedulerDriver, createOptimizer()),
 			"launchCoordinator");
 	}
 
-	protected ActorRef createReconciliationCoordinator() {
+	protected ActorRef createReconciliationCoordinator(SchedulerDriver schedulerDriver) {
 		return actorSystem.actorOf(
 			ReconciliationCoordinator.createActorProps(ReconciliationCoordinator.class, flinkConfig, schedulerDriver),
 			"reconciliationCoordinator");
@@ -220,14 +219,10 @@ public class MesosResourceManager extends ResourceManager<MesosResourceManagerGa
 		// start the worker store
 		try {
 			workerStore.start();
-		}
-		catch(Exception e) {
+		} catch (Exception e) {
 			throw new ResourceManagerException("Unable to initialize the worker store.", e);
 		}
 
-		// create the scheduler driver to communicate with Mesos
-		schedulerCallbackHandler = new SchedulerProxyV2(getSelf());
-
 		// register with Mesos
 		// TODO : defer connection until RM acquires leadership
 
@@ -242,27 +237,27 @@ public class MesosResourceManager extends ResourceManager<MesosResourceManagerGa
 				LOG.info("Recovery scenario: re-registering using framework ID {}.", frameworkID.get().getValue());
 				frameworkInfo.setId(frameworkID.get());
 			}
-		}
-		catch(Exception e) {
+		} catch (Exception e) {
 			throw new ResourceManagerException("Unable to recover the framework ID.", e);
 		}
 
 		MesosConfiguration initializedMesosConfig = mesosConfig.withFrameworkInfo(frameworkInfo);
 		MesosConfiguration.logMesosConfig(LOG, initializedMesosConfig);
-		schedulerDriver = initializedMesosConfig.createDriver(schedulerCallbackHandler, false);
+		schedulerDriver = initializedMesosConfig.createDriver(
+			new MesosResourceManagerSchedulerCallback(),
+			false);
 
 		// create supporting actors
 		selfActor = createSelfActor();
 		connectionMonitor = createConnectionMonitor();
-		launchCoordinator = createLaunchCoordinator();
-		reconciliationCoordinator = createReconciliationCoordinator();
-		taskMonitor = createTaskRouter();
+		launchCoordinator = createLaunchCoordinator(schedulerDriver, selfActor);
+		reconciliationCoordinator = createReconciliationCoordinator(schedulerDriver);
+		taskMonitor = createTaskMonitor(schedulerDriver);
 
 		// recover state
 		try {
 			recoverWorkers();
-		}
-		catch(Exception e) {
+		} catch (Exception e) {
 			throw new ResourceManagerException("Unable to recover Mesos worker state.", e);
 		}
 
@@ -288,7 +283,7 @@ public class MesosResourceManager extends ResourceManager<MesosResourceManagerGa
 		if (!tasksFromPreviousAttempts.isEmpty()) {
 			LOG.info("Retrieved {} TaskManagers from previous attempt", tasksFromPreviousAttempts.size());
 
-			List<Tuple2<TaskRequest,String>> toAssign = new ArrayList<>(tasksFromPreviousAttempts.size());
+			List<Tuple2<TaskRequest, String>> toAssign = new ArrayList<>(tasksFromPreviousAttempts.size());
 
 			for (final MesosWorkerStore.Worker worker : tasksFromPreviousAttempts) {
 				LaunchableMesosWorker launchable = createLaunchableMesosWorker(worker.taskID(), worker.profile());
@@ -310,31 +305,38 @@ public class MesosResourceManager extends ResourceManager<MesosResourceManagerGa
 			}
 
 			// tell the launch coordinator about prior assignments
-			if(toAssign.size() >= 1) {
+			if (toAssign.size() >= 1) {
 				launchCoordinator.tell(new LaunchCoordinator.Assign(toAssign), selfActor);
 			}
 		}
 	}
 
 	@Override
-	protected void shutDownApplication(ApplicationStatus finalStatus, String optionalDiagnostics) {
+	protected void shutDownApplication(
+			ApplicationStatus finalStatus,
+			String optionalDiagnostics) throws ResourceManagerException {
 		LOG.info("Shutting down and unregistering as a Mesos framework.");
+
+		Exception exception = null;
+
 		try {
 			// unregister the framework, which implicitly removes all tasks.
 			schedulerDriver.stop(false);
-		}
-		catch(Exception ex) {
-			LOG.warn("unable to unregister the framework", ex);
+		} catch (Exception ex) {
+			exception = new Exception("Could not unregister the Mesos framework.", ex);
 		}
 
 		try {
 			workerStore.stop(true);
-		}
-		catch(Exception ex) {
-			LOG.warn("unable to stop the worker state store", ex);
+		} catch (Exception ex) {
+			exception = ExceptionUtils.firstOrSuppressed(
+				new Exception("Could not stop the Mesos worker store.", ex),
+				exception);
 		}
 
-		LOG.info("Shutdown completed.");
+		if (exception != null) {
+			throw new ResourceManagerException("Could not properly shut down the Mesos application.", exception);
+		}
 	}
 
 	@Override
@@ -356,8 +358,7 @@ public class MesosResourceManager extends ResourceManager<MesosResourceManagerGa
 
 			// tell the launch coordinator to launch the new tasks
 			launchCoordinator.tell(new LaunchCoordinator.Launch(Collections.singletonList((LaunchableTask) launchable)), selfActor);
-		}
-		catch(Exception ex) {
+		} catch (Exception ex) {
 			onFatalErrorAsync(new ResourceManagerException("Unable to request new workers.", ex));
 		}
 	}
@@ -387,16 +388,14 @@ public class MesosResourceManager extends ResourceManager<MesosResourceManagerGa
 	}
 
 	// ------------------------------------------------------------------------
-	//  RPC methods
+	//  Mesos specific methods
 	// ------------------------------------------------------------------------
 
-	@RpcMethod
-	public void registered(Registered message) {
+	protected void registered(Registered message) {
 		connectionMonitor.tell(message, selfActor);
 		try {
 			workerStore.setFrameworkID(Option.apply(message.frameworkId()));
-		}
-		catch(Exception ex) {
+		} catch (Exception ex) {
 			onFatalError(new ResourceManagerException("Unable to store the assigned framework ID.", ex));
 			return;
 		}
@@ -409,8 +408,7 @@ public class MesosResourceManager extends ResourceManager<MesosResourceManagerGa
 	/**
 	 * Called when reconnected to Mesos following a failover event.
 	 */
-	@RpcMethod
-	public void reregistered(ReRegistered message) {
+	protected void reregistered(ReRegistered message) {
 		connectionMonitor.tell(message, selfActor);
 		launchCoordinator.tell(message, selfActor);
 		reconciliationCoordinator.tell(message, selfActor);
@@ -420,8 +418,7 @@ public class MesosResourceManager extends ResourceManager<MesosResourceManagerGa
 	/**
 	 * Called when disconnected from Mesos.
 	 */
-	@RpcMethod
-	public void disconnected(Disconnected message) {
+	protected void disconnected(Disconnected message) {
 		connectionMonitor.tell(message, selfActor);
 		launchCoordinator.tell(message, selfActor);
 		reconciliationCoordinator.tell(message, selfActor);
@@ -431,26 +428,38 @@ public class MesosResourceManager extends ResourceManager<MesosResourceManagerGa
 	/**
 	 * Called when resource offers are made to the framework.
 	 */
-	@RpcMethod
-	public void resourceOffers(ResourceOffers message) {
+	protected void resourceOffers(ResourceOffers message) {
 		launchCoordinator.tell(message, selfActor);
 	}
 
 	/**
 	 * Called when resource offers are rescinded.
 	 */
-	@RpcMethod
-	public void offerRescinded(OfferRescinded message) {
+	protected void offerRescinded(OfferRescinded message) {
 		launchCoordinator.tell(message, selfActor);
 	}
 
 	/**
+	 * Handles a task status update from Mesos.
+	 */
+	protected void statusUpdate(StatusUpdate message) {
+		taskMonitor.tell(message, selfActor);
+		reconciliationCoordinator.tell(message, selfActor);
+		schedulerDriver.acknowledgeStatusUpdate(message.status());
+	}
+
+	protected void frameworkMessage(FrameworkMessage message) {}
+
+	protected void slaveLost(SlaveLost message) {}
+
+	protected void executorLost(ExecutorLost message) {}
+
+	/**
 	 * Accept offers as advised by the launch coordinator.
 	 *
-	 * Acceptance is routed through the RM to update the persistent state before
+	 * <p>Acceptance is routed through the RM to update the persistent state before
 	 * forwarding the message to Mesos.
 	 */
-	@RpcMethod
 	public void acceptOffers(AcceptOffers msg) {
 		try {
 			List<TaskMonitor.TaskGoalStateUpdated> toMonitor = new ArrayList<>(msg.operations().size());
@@ -481,26 +490,14 @@ public class MesosResourceManager extends ResourceManager<MesosResourceManagerGa
 
 			// send the acceptance message to Mesos
 			schedulerDriver.acceptOffers(msg.offerIds(), msg.operations(), msg.filters());
-		}
-		catch(Exception ex) {
+		} catch (Exception ex) {
 			onFatalError(new ResourceManagerException("unable to accept offers", ex));
 		}
 	}
 
 	/**
-	 * Handles a task status update from Mesos.
-	 */
-	@RpcMethod
-	public void statusUpdate(StatusUpdate message) {
-		taskMonitor.tell(message, selfActor);
-		reconciliationCoordinator.tell(message, selfActor);
-		schedulerDriver.acknowledgeStatusUpdate(message.status());
-	}
-
-	/**
 	 * Handles a reconciliation request from a task monitor.
 	 */
-	@RpcMethod
 	public void reconcile(ReconciliationCoordinator.Reconcile message) {
 		// forward to the reconciliation coordinator
 		reconciliationCoordinator.tell(message, selfActor);
@@ -509,7 +506,6 @@ public class MesosResourceManager extends ResourceManager<MesosResourceManagerGa
 	/**
 	 * Handles a termination notification from a task monitor.
 	 */
-	@RpcMethod
 	public void taskTerminated(TaskMonitor.TaskTerminated message) {
 		Protos.TaskID taskID = message.taskID();
 		Protos.TaskStatus status = message.status();
@@ -520,13 +516,12 @@ public class MesosResourceManager extends ResourceManager<MesosResourceManagerGa
 		boolean existed;
 		try {
 			existed = workerStore.removeWorker(taskID);
-		}
-		catch(Exception ex) {
+		} catch (Exception ex) {
 			onFatalError(new ResourceManagerException("unable to remove worker", ex));
 			return;
 		}
 
-		if(!existed) {
+		if (!existed) {
 			LOG.info("Received a termination notice for an unrecognized worker: {}", id);
 			return;
 		}
@@ -550,23 +545,6 @@ public class MesosResourceManager extends ResourceManager<MesosResourceManagerGa
 		closeTaskManagerConnection(id, new Exception(status.getMessage()));
 	}
 
-	@RpcMethod
-	public void frameworkMessage(FrameworkMessage message) {}
-
-	@RpcMethod
-	public void slaveLost(SlaveLost message) {}
-
-	@RpcMethod
-	public void executorLost(ExecutorLost message) {}
-
-	/**
-	 * Called when an error is reported by the scheduler callback.
-	 */
-	@RpcMethod
-	public void error(Error message) {
-		onFatalError(new ResourceManagerException("Connection to Mesos failed", new Exception(message.message())));
-	}
-
 	// ------------------------------------------------------------------------
 	//  Utilities
 	// ------------------------------------------------------------------------
@@ -650,29 +628,139 @@ public class MesosResourceManager extends ResourceManager<MesosResourceManagerGa
 		};
 	}
 
+	private class MesosResourceManagerSchedulerCallback implements Scheduler {
+
+		@Override
+		public void registered(SchedulerDriver driver, final Protos.FrameworkID frameworkId, final Protos.MasterInfo masterInfo) {
+			runAsync(new Runnable() {
+				@Override
+				public void run() {
+					MesosResourceManager.this.registered(new Registered(frameworkId, masterInfo));
+				}
+			});
+		}
+
+		@Override
+		public void reregistered(SchedulerDriver driver, final Protos.MasterInfo masterInfo) {
+			runAsync(new Runnable() {
+				@Override
+				public void run() {
+					MesosResourceManager.this.reregistered(new ReRegistered(masterInfo));
+				}
+			});
+		}
+
+		@Override
+		public void resourceOffers(SchedulerDriver driver, final List<Protos.Offer> offers) {
+			runAsync(new Runnable() {
+				@Override
+				public void run() {
+					MesosResourceManager.this.resourceOffers(new ResourceOffers(offers));
+				}
+			});
+		}
+
+		@Override
+		public void offerRescinded(SchedulerDriver driver, final Protos.OfferID offerId) {
+			runAsync(new Runnable() {
+				@Override
+				public void run() {
+					MesosResourceManager.this.offerRescinded(new OfferRescinded(offerId));
+				}
+			});
+		}
+
+		@Override
+		public void statusUpdate(SchedulerDriver driver, final Protos.TaskStatus status) {
+			runAsync(new Runnable() {
+				@Override
+				public void run() {
+					MesosResourceManager.this.statusUpdate(new StatusUpdate(status));
+				}
+			});
+		}
+
+		@Override
+		public void frameworkMessage(SchedulerDriver driver, final Protos.ExecutorID executorId, final Protos.SlaveID slaveId, final byte[] data) {
+			runAsync(new Runnable() {
+				@Override
+				public void run() {
+					MesosResourceManager.this.frameworkMessage(new FrameworkMessage(executorId, slaveId, data));
+				}
+			});
+		}
+
+		@Override
+		public void disconnected(SchedulerDriver driver) {
+			runAsync(new Runnable() {
+				@Override
+				public void run() {
+					MesosResourceManager.this.disconnected(new Disconnected());
+				}
+			});
+		}
+
+		@Override
+		public void slaveLost(SchedulerDriver driver, final Protos.SlaveID slaveId) {
+			runAsync(new Runnable() {
+				@Override
+				public void run() {
+					MesosResourceManager.this.slaveLost(new SlaveLost(slaveId));
+				}
+			});
+		}
+
+		@Override
+		public void executorLost(SchedulerDriver driver, final Protos.ExecutorID executorId, final Protos.SlaveID slaveId, final int status) {
+			runAsync(new Runnable() {
+				@Override
+				public void run() {
+					MesosResourceManager.this.executorLost(new ExecutorLost(executorId, slaveId, status));
+				}
+			});
+		}
+
+		@Override
+		public void error(SchedulerDriver driver, final String message) {
+			runAsync(new Runnable() {
+				@Override
+				public void run() {
+					onFatalError(new ResourceManagerException(message));
+				}
+			});
+		}
+	}
+
 	/**
 	 * Adapts incoming Akka messages as RPC calls to the resource manager.
 	 */
-	static class AkkaAdapter extends UntypedActor {
-		private final MesosResourceManagerGateway gateway;
-		AkkaAdapter(MesosResourceManagerGateway gateway) {
-			this.gateway = gateway;
-		}
+	private class AkkaAdapter extends UntypedActor {
 		@Override
-		public void onReceive(Object message) throws Exception {
+		public void onReceive(final Object message) throws Exception {
 			if (message instanceof ReconciliationCoordinator.Reconcile) {
-				gateway.reconcile((ReconciliationCoordinator.Reconcile) message);
+				runAsync(new Runnable() {
+					@Override
+					public void run() {
+						reconcile((ReconciliationCoordinator.Reconcile) message);
+					}
+				});
 			} else if (message instanceof TaskMonitor.TaskTerminated) {
-				gateway.taskTerminated((TaskMonitor.TaskTerminated) message);
+				runAsync(new Runnable() {
+					@Override
+					public void run() {
+						taskTerminated((TaskMonitor.TaskTerminated) message);
+					}
+				});
 			} else if (message instanceof AcceptOffers) {
-				gateway.acceptOffers((AcceptOffers) message);
+				runAsync(new Runnable() {
+					@Override
+					public void run() {
+						acceptOffers((AcceptOffers) message);
+					}
+				});
 			} else {
 				MesosResourceManager.LOG.error("unrecognized message: " + message);
 			}
 		}
-
-		public static Props createActorProps(MesosResourceManagerGateway gateway) {
-			return Props.create(AkkaAdapter.class, gateway);
-		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/901a9caf/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerActions.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerActions.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerActions.java
new file mode 100644
index 0000000..e1b0300
--- /dev/null
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerActions.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.runtime.clusterframework;
+
+import org.apache.flink.mesos.scheduler.LaunchCoordinator;
+import org.apache.flink.mesos.scheduler.ReconciliationCoordinator;
+import org.apache.flink.mesos.scheduler.TaskMonitor;
+import org.apache.flink.mesos.scheduler.messages.AcceptOffers;
+
+/**
+ * Actions defined by the MesosResourceManager.
+ *
+ * <p>These are called by the MesosResourceManager components such
+ * as {@link LaunchCoordinator}, and {@link TaskMonitor}.
+ */
+public interface MesosResourceManagerActions {
+
+	/**
+	 * Accept the given offers as advised by the launch coordinator.
+	 *
+	 * <p>Note: This method is a callback for the {@link LaunchCoordinator}.
+	 *
+	 * @param offersToAccept Offers to accept from Mesos
+	 */
+	void acceptOffers(AcceptOffers offersToAccept);
+
+	/**
+	 * Trigger reconciliation with the Mesos master.
+	 *
+	 * <p>Note: This method is a callback for the {@link TaskMonitor}.
+	 *
+	 * @param reconciliationRequest Message containing the tasks which shall be reconciled
+	 */
+	void reconcile(ReconciliationCoordinator.Reconcile reconciliationRequest);
+
+	/**
+	 * Notify that the given Mesos task has been terminated.
+	 *
+	 * <p>Note: This method is a callback for the {@link TaskMonitor}.
+	 *
+	 * @param terminatedTask Message containing the terminated task
+	 */
+	void taskTerminated(TaskMonitor.TaskTerminated terminatedTask);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/901a9caf/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerGateway.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerGateway.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerGateway.java
deleted file mode 100644
index 70ed47d..0000000
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerGateway.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.mesos.runtime.clusterframework;
-
-import org.apache.flink.mesos.scheduler.LaunchCoordinator;
-import org.apache.flink.mesos.scheduler.ReconciliationCoordinator;
-import org.apache.flink.mesos.scheduler.SchedulerGateway;
-import org.apache.flink.mesos.scheduler.TaskMonitor;
-import org.apache.flink.mesos.scheduler.messages.AcceptOffers;
-import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
-
-/**
- * The {@link MesosResourceManager}'s RPC gateway interface.
- */
-public interface MesosResourceManagerGateway extends ResourceManagerGateway, SchedulerGateway {
-
-	/**
-	 * Accept the given offers as advised by the launch coordinator.
-	 *
-	 * Note: This method is a callback for the {@link LaunchCoordinator}.
-	 *
-	 * @param offersToAccept Offers to accept from Mesos
-	 */
-	void acceptOffers(AcceptOffers offersToAccept);
-
-	/**
-	 * Trigger reconciliation with the Mesos master.
-	 *
-	 * Note: This method is a callback for the {@link TaskMonitor}.
-	 *
-	 * @param reconciliationRequest Message containing the tasks which shall be reconciled
-	 */
-	void reconcile(ReconciliationCoordinator.Reconcile reconciliationRequest);
-
-	/**
-	 * Notify that the given Mesos task has been terminated.
-	 *
-	 * Note: This method is a callback for the {@link TaskMonitor}.
-	 *
-	 * @param terminatedTask Message containing the terminated task
-	 */
-	void taskTerminated(TaskMonitor.TaskTerminated terminatedTask);
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/901a9caf/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/MesosWorkerStore.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/MesosWorkerStore.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/MesosWorkerStore.java
index a9ec892..06aeb59 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/MesosWorkerStore.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/MesosWorkerStore.java
@@ -19,6 +19,7 @@
 package org.apache.flink.mesos.runtime.clusterframework.store;
 
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+
 import org.apache.mesos.Protos;
 
 import java.io.Serializable;

http://git-wip-us.apache.org/repos/asf/flink/blob/901a9caf/flink-mesos/src/main/java/org/apache/flink/mesos/scheduler/SchedulerGateway.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/scheduler/SchedulerGateway.java b/flink-mesos/src/main/java/org/apache/flink/mesos/scheduler/SchedulerGateway.java
deleted file mode 100644
index 0dea4e7..0000000
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/scheduler/SchedulerGateway.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.mesos.scheduler;
-
-import org.apache.flink.mesos.scheduler.messages.Disconnected;
-import org.apache.flink.mesos.scheduler.messages.Error;
-import org.apache.flink.mesos.scheduler.messages.ExecutorLost;
-import org.apache.flink.mesos.scheduler.messages.FrameworkMessage;
-import org.apache.flink.mesos.scheduler.messages.OfferRescinded;
-import org.apache.flink.mesos.scheduler.messages.ReRegistered;
-import org.apache.flink.mesos.scheduler.messages.Registered;
-import org.apache.flink.mesos.scheduler.messages.ResourceOffers;
-import org.apache.flink.mesos.scheduler.messages.SlaveLost;
-import org.apache.flink.mesos.scheduler.messages.StatusUpdate;
-import org.apache.flink.runtime.rpc.RpcGateway;
-
-/**
- * A scheduler's RPC gateway interface.
- *
- * Implemented by RPC endpoints that accept Mesos scheduler messages.
- */
-public interface SchedulerGateway extends RpcGateway {
-
-	/**
-	 * Called when connected to Mesos as a new framework.
-	 */
-	void registered(Registered message);
-
-	/**
-	 * Called when reconnected to Mesos following a failover event.
-	 */
-	void reregistered(ReRegistered message);
-
-	/**
-	 * Called when disconnected from Mesos.
-	 */
-	void disconnected(Disconnected message);
-
-	/**
-	 * Called when resource offers are made to the framework.
-	 */
-	void resourceOffers(ResourceOffers message);
-
-	/**
-	 * Called when resource offers are rescinded.
-	 */
-	void offerRescinded(OfferRescinded message);
-
-	/**
-	 * Called when a status update arrives from the Mesos master.
-	 */
-	void statusUpdate(StatusUpdate message);
-
-	/**
-	 * Called when a framework message arrives from a custom Mesos task executor.
-	 */
-	void frameworkMessage(FrameworkMessage message);
-
-	/**
-	 * Called when a Mesos slave is lost.
-	 */
-	void slaveLost(SlaveLost message);
-
-	/**
-	 * Called when a custom Mesos task executor is lost.
-	 */
-	void executorLost(ExecutorLost message);
-
-	/**
-	 * Called when an error is reported by the scheduler callback.
-	 */
-	void error(Error message);
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/901a9caf/flink-mesos/src/main/java/org/apache/flink/mesos/scheduler/SchedulerProxyV2.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/scheduler/SchedulerProxyV2.java b/flink-mesos/src/main/java/org/apache/flink/mesos/scheduler/SchedulerProxyV2.java
deleted file mode 100644
index 21c8346..0000000
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/scheduler/SchedulerProxyV2.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.mesos.scheduler;
-
-import org.apache.flink.mesos.scheduler.messages.Disconnected;
-import org.apache.flink.mesos.scheduler.messages.Error;
-import org.apache.flink.mesos.scheduler.messages.ExecutorLost;
-import org.apache.flink.mesos.scheduler.messages.FrameworkMessage;
-import org.apache.flink.mesos.scheduler.messages.OfferRescinded;
-import org.apache.flink.mesos.scheduler.messages.ReRegistered;
-import org.apache.flink.mesos.scheduler.messages.Registered;
-import org.apache.flink.mesos.scheduler.messages.ResourceOffers;
-import org.apache.flink.mesos.scheduler.messages.SlaveLost;
-import org.apache.flink.mesos.scheduler.messages.StatusUpdate;
-import org.apache.mesos.Protos;
-import org.apache.mesos.Scheduler;
-import org.apache.flink.mesos.runtime.clusterframework.MesosResourceManager;
-import org.apache.mesos.SchedulerDriver;
-
-import java.util.List;
-
-/**
- * This class reacts to callbacks from the Mesos scheduler driver.
- *
- * Forwards incoming messages to the {@link MesosResourceManager} RPC gateway.
- *
- * See https://mesos.apache.org/api/latest/java/org/apache/mesos/Scheduler.html
- */
-public class SchedulerProxyV2 implements Scheduler {
-
-	/** The actor to which we report the callbacks */
-	private final SchedulerGateway gateway;
-
-	public SchedulerProxyV2(SchedulerGateway gateway) {
-		this.gateway = gateway;
-	}
-
-	@Override
-	public void registered(SchedulerDriver driver, Protos.FrameworkID frameworkId, Protos.MasterInfo masterInfo) {
-		gateway.registered(new Registered(frameworkId, masterInfo));
-	}
-
-	@Override
-	public void reregistered(SchedulerDriver driver, Protos.MasterInfo masterInfo) {
-		gateway.reregistered(new ReRegistered(masterInfo));
-	}
-
-	@Override
-	public void disconnected(SchedulerDriver driver) {
-		gateway.disconnected(new Disconnected());
-	}
-
-	@Override
-	public void resourceOffers(SchedulerDriver driver, List<Protos.Offer> offers) {
-		gateway.resourceOffers(new ResourceOffers(offers));
-	}
-
-	@Override
-	public void offerRescinded(SchedulerDriver driver, Protos.OfferID offerId) {
-		gateway.offerRescinded(new OfferRescinded(offerId));
-	}
-
-	@Override
-	public void statusUpdate(SchedulerDriver driver, Protos.TaskStatus status) {
-		gateway.statusUpdate(new StatusUpdate(status));
-	}
-
-	@Override
-	public void frameworkMessage(SchedulerDriver driver, Protos.ExecutorID executorId, Protos.SlaveID slaveId, byte[] data) {
-		gateway.frameworkMessage(new FrameworkMessage(executorId, slaveId, data));
-	}
-
-	@Override
-	public void slaveLost(SchedulerDriver driver, Protos.SlaveID slaveId) {
-		gateway.slaveLost(new SlaveLost(slaveId));
-	}
-
-	@Override
-	public void executorLost(SchedulerDriver driver, Protos.ExecutorID executorId, Protos.SlaveID slaveId, int status) {
-		gateway.executorLost(new ExecutorLost(executorId, slaveId, status));
-	}
-
-	@Override
-	public void error(SchedulerDriver driver, String message) {
-		gateway.error(new Error(message));
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/901a9caf/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java
index 929e927..6e6a59c 100644
--- a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java
+++ b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java
@@ -18,23 +18,20 @@
 
 package org.apache.flink.mesos.runtime.clusterframework;
 
-
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import akka.testkit.JavaTestKit;
-import akka.testkit.TestProbe;
-import com.netflix.fenzo.ConstraintEvaluator;
-import junit.framework.AssertionFailedError;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore;
 import org.apache.flink.mesos.scheduler.ConnectionMonitor;
 import org.apache.flink.mesos.scheduler.LaunchCoordinator;
-import org.apache.flink.mesos.scheduler.ReconciliationCoordinator;
 import org.apache.flink.mesos.scheduler.TaskMonitor;
-import org.apache.flink.mesos.scheduler.messages.*;
-import org.apache.flink.mesos.scheduler.messages.Error;
+import org.apache.flink.mesos.scheduler.messages.AcceptOffers;
+import org.apache.flink.mesos.scheduler.messages.Disconnected;
+import org.apache.flink.mesos.scheduler.messages.OfferRescinded;
+import org.apache.flink.mesos.scheduler.messages.ReRegistered;
+import org.apache.flink.mesos.scheduler.messages.Registered;
+import org.apache.flink.mesos.scheduler.messages.ResourceOffers;
+import org.apache.flink.mesos.scheduler.messages.StatusUpdate;
 import org.apache.flink.mesos.util.MesosArtifactResolver;
 import org.apache.flink.mesos.util.MesosConfiguration;
 import org.apache.flink.runtime.akka.AkkaUtils;
@@ -55,7 +52,9 @@ import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
 import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
 import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.registration.RegistrationResponse;
-import org.apache.flink.runtime.resourcemanager.*;
+import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
+import org.apache.flink.runtime.resourcemanager.SlotRequest;
 import org.apache.flink.runtime.resourcemanager.slotmanager.ResourceManagerActions;
 import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
@@ -66,30 +65,51 @@ import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorRegistrationSuccess;
 import org.apache.flink.runtime.util.TestingFatalErrorHandler;
 import org.apache.flink.util.TestLogger;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.testkit.JavaTestKit;
+import akka.testkit.TestProbe;
+import com.netflix.fenzo.ConstraintEvaluator;
+import junit.framework.AssertionFailedError;
 import org.apache.mesos.Protos;
 import org.apache.mesos.Scheduler;
 import org.apache.mesos.SchedulerDriver;
-import org.junit.*;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import scala.Option;
-import scala.collection.JavaConverters;
 
-import java.util.*;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.UUID;
 import java.util.concurrent.Executor;
 import java.util.concurrent.TimeUnit;
 
+import scala.Option;
+
 import static java.util.Collections.singletonList;
 import static org.apache.flink.mesos.runtime.clusterframework.MesosFlinkResourceManager.extractGoalState;
 import static org.apache.flink.mesos.runtime.clusterframework.MesosFlinkResourceManager.extractResourceID;
-import static org.hamcrest.Matchers.*;
+import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.hasEntry;
+import static org.hamcrest.Matchers.hasItem;
+import static org.hamcrest.Matchers.hasSize;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyBoolean;
-import static org.mockito.Mockito.*;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 /**
  * General tests for the Mesos resource manager component (v2).
@@ -151,13 +171,24 @@ public class MesosResourceManagerTest extends TestLogger {
 		}
 
 		@Override
-		protected ActorRef createConnectionMonitor() { return connectionMonitor.ref(); }
+		protected ActorRef createConnectionMonitor() {
+			return connectionMonitor.ref();
+		}
+
 		@Override
-		protected ActorRef createTaskRouter() { return taskRouter.ref(); }
+		protected ActorRef createTaskMonitor(SchedulerDriver schedulerDriver) {
+			return taskRouter.ref();
+		}
+
 		@Override
-		protected ActorRef createLaunchCoordinator() { return launchCoordinator.ref(); }
+		protected ActorRef createLaunchCoordinator(SchedulerDriver schedulerDriver, ActorRef selfActorRef) {
+			return launchCoordinator.ref();
+		}
+
 		@Override
-		protected ActorRef createReconciliationCoordinator() { return reconciliationCoordinator.ref(); }
+		protected ActorRef createReconciliationCoordinator(SchedulerDriver schedulerDriver) {
+			return reconciliationCoordinator.ref();
+		}
 
 		@Override
 		protected void closeTaskManagerConnection(ResourceID resourceID, Exception cause) {
@@ -179,7 +210,7 @@ public class MesosResourceManagerTest extends TestLogger {
 		// RM
 		ResourceManagerConfiguration rmConfiguration;
 		ResourceID rmResourceID;
-		static final String rmAddress = "/resourceManager";
+		static final String RM_ADDRESS = "/resourceManager";
 		TestingMesosResourceManager resourceManager;
 
 		// domain objects for test purposes
@@ -227,7 +258,7 @@ public class MesosResourceManagerTest extends TestLogger {
 			resourceManager =
 				new TestingMesosResourceManager(
 					rpcService,
-					rmAddress,
+					RM_ADDRESS,
 					rmResourceID,
 					rmConfiguration,
 					rmServices.highAvailabilityServices,
@@ -252,7 +283,7 @@ public class MesosResourceManagerTest extends TestLogger {
 			task3Executor = mockTaskExecutor(task3);
 
 			// JobMaster
-			jobMaster1 = mockJobMaster(rmServices, new JobID(1,0));
+			jobMaster1 = mockJobMaster(rmServices, new JobID(1, 0));
 		}
 
 		/**
@@ -290,7 +321,7 @@ public class MesosResourceManagerTest extends TestLogger {
 						rmActions = invocation.getArgumentAt(2, ResourceManagerActions.class);
 						return null;
 					}
-				}).when(slotManager).start(any(UUID.class),any(Executor.class),any(ResourceManagerActions.class));
+				}).when(slotManager).start(any(UUID.class), any(Executor.class), any(ResourceManagerActions.class));
 
 				when(slotManager.registerSlotRequest(any(SlotRequest.class))).thenReturn(true);
 			}
@@ -669,41 +700,4 @@ public class MesosResourceManagerTest extends TestLogger {
 			resourceManager.taskRouter.expectMsgClass(Disconnected.class);
 		}};
 	}
-
-	/**
-	 * Test Mesos scheduler error.
-	 */
-	@Test
-	public void testError() throws Exception {
-		new Context() {{
-			when(rmServices.workerStore.getFrameworkID()).thenReturn(Option.apply(framework1));
-			startResourceManager();
-			resourceManager.error(new Error("test"));
-			assertTrue(fatalErrorHandler.hasExceptionOccurred());
-		}};
-	}
-
-	@Test
-	public void testAdapter() throws Exception {
-		Protos.TaskID task1 = Protos.TaskID.newBuilder().setValue("taskmanager-00001").build();
-		Protos.TaskStatus status1 = Protos.TaskStatus.newBuilder().setTaskId(task1).setState(Protos.TaskState.TASK_KILLED).build();
-		String host1 = "host1";
-
-		MesosResourceManagerGateway gateway = mock(MesosResourceManagerGateway.class);
-		ActorRef adapter = system.actorOf(MesosResourceManager.AkkaAdapter.createActorProps(gateway));
-
-		List<Protos.TaskStatus> tasks = Collections.singletonList(status1);
-		ReconciliationCoordinator.Reconcile msg1 = new ReconciliationCoordinator.Reconcile(
-			JavaConverters.asScalaBufferConverter(tasks).asScala(), false);
-		adapter.tell(msg1, ActorRef.noSender());
-		verify(gateway, timeout(1000L)).reconcile(eq(msg1));
-
-		TaskMonitor.TaskTerminated msg2 = new TaskMonitor.TaskTerminated(task1, status1);
-		adapter.tell(msg2, ActorRef.noSender());
-		verify(gateway, timeout(1000L)).taskTerminated(eq(msg2));
-
-		AcceptOffers msg3 = new AcceptOffers(host1, Collections.<Protos.OfferID>emptyList(), Collections.<Protos.Offer.Operation>emptyList());
-		adapter.tell(msg3, ActorRef.noSender());
-		verify(gateway, timeout(1000L)).acceptOffers(eq(msg3));
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/901a9caf/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index df452e3..6e7c6af 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -83,8 +83,8 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  *     <li>{@link #requestSlot(UUID, UUID, SlotRequest)} requests a slot from the resource manager</li>
  * </ul>
  */
-public abstract class ResourceManager<C extends ResourceManagerGateway, WorkerType extends Serializable>
-		extends RpcEndpoint<C>
+public abstract class ResourceManager<WorkerType extends Serializable>
+		extends RpcEndpoint<ResourceManagerGateway>
 		implements LeaderContender {
 
 	public static final String RESOURCE_MANAGER_NAME = "resourcemanager";
@@ -609,8 +609,13 @@ public abstract class ResourceManager<C extends ResourceManagerGateway, WorkerTy
 	 */
 	@RpcMethod
 	public void shutDownCluster(final ApplicationStatus finalStatus, final String optionalDiagnostics) {
-		log.info("shut down cluster because application is in {}, diagnostics {}", finalStatus, optionalDiagnostics);
-		shutDownApplication(finalStatus, optionalDiagnostics);
+		log.info("Shut down cluster because application is in {}, diagnostics {}.", finalStatus, optionalDiagnostics);
+
+		try {
+			shutDownApplication(finalStatus, optionalDiagnostics);
+		} catch (ResourceManagerException e) {
+			log.warn("Could not properly shutdown the application.", e);
+		}
 	}
 
 	@RpcMethod
@@ -880,8 +885,9 @@ public abstract class ResourceManager<C extends ResourceManagerGateway, WorkerTy
 	 *
 	 * @param finalStatus The application status to report.
 	 * @param optionalDiagnostics An optional diagnostics message.
+	 * @throws ResourceManagerException if the application could not be shut down.
 	 */
-	protected abstract void shutDownApplication(ApplicationStatus finalStatus, String optionalDiagnostics);
+	protected abstract void shutDownApplication(ApplicationStatus finalStatus, String optionalDiagnostics) throws ResourceManagerException;
 
 	/**
 	 * Allocates a resource using the resource profile.

http://git-wip-us.apache.org/repos/asf/flink/blob/901a9caf/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java
index 12d3a7d..d0c411c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java
@@ -42,7 +42,7 @@ public class ResourceManagerRunner implements FatalErrorHandler {
 
 	private final ResourceManagerRuntimeServices resourceManagerRuntimeServices;
 
-	private final ResourceManager<? extends ResourceManagerGateway, ?> resourceManager;
+	private final ResourceManager<?> resourceManager;
 
 	public ResourceManagerRunner(
 			final ResourceID resourceId,

http://git-wip-us.apache.org/repos/asf/flink/blob/901a9caf/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java
index afeddc4..a921a29 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java
@@ -36,7 +36,7 @@ import org.apache.flink.runtime.rpc.RpcService;
  *
  * This ResourceManager doesn't acquire new resources.
  */
-public class StandaloneResourceManager extends ResourceManager<StandaloneResourceManagerGateway, ResourceID> {
+public class StandaloneResourceManager extends ResourceManager<ResourceID> {
 
 	public StandaloneResourceManager(
 			RpcService rpcService,

http://git-wip-us.apache.org/repos/asf/flink/blob/901a9caf/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManagerGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManagerGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManagerGateway.java
deleted file mode 100644
index 6c8de66..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManagerGateway.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.resourcemanager;
-
-/**
- * The {@link StandaloneResourceManager}'s RPC gateway interface.
- */
-public interface StandaloneResourceManagerGateway extends ResourceManagerGateway {
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/901a9caf/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
index 85ed950..6a0bd87 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
@@ -47,7 +47,6 @@ import org.apache.flink.runtime.resourcemanager.ResourceManager;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
 import org.apache.flink.runtime.resourcemanager.SlotRequest;
 import org.apache.flink.runtime.resourcemanager.StandaloneResourceManager;
-import org.apache.flink.runtime.resourcemanager.StandaloneResourceManagerGateway;
 import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
 import org.apache.flink.runtime.rpc.TestingSerialRpcService;
 import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
@@ -127,7 +126,7 @@ public class TaskExecutorITCase extends TestLogger {
 			TestingUtils.infiniteTime(),
 			TestingUtils.infiniteTime());
 
-		ResourceManager<StandaloneResourceManagerGateway,ResourceID> resourceManager = new StandaloneResourceManager(
+		ResourceManager<ResourceID> resourceManager = new StandaloneResourceManager(
 			rpcService,
 			FlinkResourceManager.RESOURCE_MANAGER_NAME,
 			rmResourceId,

http://git-wip-us.apache.org/repos/asf/flink/blob/901a9caf/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java
index cef3378..2ad9065 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java
@@ -193,7 +193,7 @@ public class YarnFlinkApplicationMasterRunner extends AbstractYarnFlinkApplicati
 		return new AkkaRpcService(actorSystem, Time.of(duration.length(), duration.unit()));
 	}
 
-	private ResourceManager<?,?> createResourceManager(Configuration config) throws Exception {
+	private ResourceManager<?> createResourceManager(Configuration config) throws Exception {
 		final ResourceManagerConfiguration resourceManagerConfiguration = ResourceManagerConfiguration.fromConfiguration(config);
 		final ResourceManagerRuntimeServicesConfiguration resourceManagerRuntimeServicesConfiguration = ResourceManagerRuntimeServicesConfiguration.fromConfiguration(config);
 		final ResourceManagerRuntimeServices resourceManagerRuntimeServices = ResourceManagerRuntimeServices.fromConfiguration(

http://git-wip-us.apache.org/repos/asf/flink/blob/901a9caf/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
index 4ee30f4..6099d18 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
@@ -63,7 +63,7 @@ import scala.concurrent.duration.FiniteDuration;
  * The yarn implementation of the resource manager. Used when the system is started
  * via the resource framework YARN.
  */
-public class YarnResourceManager extends ResourceManager<YarnResourceManagerGateway, ResourceID> implements AMRMClientAsync.CallbackHandler {
+public class YarnResourceManager extends ResourceManager<ResourceID> implements AMRMClientAsync.CallbackHandler {
 
 	/** The process environment variables. */
 	private final Map<String, String> env;

http://git-wip-us.apache.org/repos/asf/flink/blob/901a9caf/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManagerGateway.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManagerGateway.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManagerGateway.java
deleted file mode 100644
index 485fb90..0000000
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManagerGateway.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.yarn;
-
-import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
-
-/**
- * The {@link YarnResourceManager}'s RPC gateway interface.
- */
-public interface YarnResourceManagerGateway extends ResourceManagerGateway {
-}