You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/12/23 20:22:35 UTC

[16/52] [abbrv] flink git commit: [FLINK-4851] [rm] Introduce FatalErrorHandler and MetricRegistry to RM

[FLINK-4851] [rm] Introduce FatalErrorHandler and MetricRegistry to RM

This PR introduces a FatalErrorHandler and the MetricRegistry to the RM. The FatalErrorHandler is used to handle fatal errors. Additionally, the PR adds the MetricRegistry to the RM which can be used
to register metrics.

Apart from these changes the PR restructures the code of the RM a little bit and fixes some
blocking operations.

The PR also moves the TestingFatalErrorHandler into the util package of flink-runtime test. That
it is usable across multiple tests.

Introduce ResourceManagerRunner to handle errors in the ResourceManager

This closes #2655.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/70af08c6
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/70af08c6
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/70af08c6

Branch: refs/heads/master
Commit: 70af08c62f1d30929a34314f8a5f704e2c44d5a1
Parents: a193425
Author: Till Rohrmann <tr...@apache.org>
Authored: Mon Oct 17 16:03:02 2016 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Dec 23 20:54:24 2016 +0100

----------------------------------------------------------------------
 .../flink/runtime/minicluster/MiniCluster.java  |  35 +-
 .../resourcemanager/ResourceManager.java        | 463 +++++++++++--------
 .../ResourceManagerConfiguration.java           |  86 ++++
 .../StandaloneResourceManager.java              |  25 +-
 .../exceptions/ConfigurationException.java      |  38 ++
 .../exceptions/ResourceManagerException.java    |  40 ++
 .../exceptions/ResourceManagerRunner.java       | 102 ++++
 .../registration/WorkerRegistration.java        |   5 +-
 .../resourcemanager/ResourceManagerHATest.java  |  45 +-
 .../ResourceManagerJobMasterTest.java           |  59 ++-
 .../ResourceManagerTaskExecutorTest.java        |  87 ++--
 .../resourcemanager/TestingResourceManager.java |  53 ---
 .../TestingSlotManagerFactory.java              |  30 ++
 .../slotmanager/SlotProtocolTest.java           |  47 +-
 .../runtime/taskexecutor/TaskExecutorTest.java  |  60 +--
 .../runtime/util/TestingFatalErrorHandler.java  |  83 ++++
 16 files changed, 863 insertions(+), 395 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/70af08c6/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
index d63f9a7..b005330 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
@@ -32,10 +32,7 @@ import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
-import org.apache.flink.runtime.resourcemanager.ResourceManager;
-import org.apache.flink.runtime.resourcemanager.StandaloneResourceManager;
-import org.apache.flink.runtime.resourcemanager.slotmanager.DefaultSlotManager;
-import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerFactory;
+import org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerRunner;
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
 import org.apache.flink.runtime.taskexecutor.TaskManagerRunner;
@@ -52,7 +49,6 @@ import static org.apache.flink.util.ExceptionUtils.firstOrSuppressed;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.apache.flink.util.Preconditions.checkState;
 
-
 public class MiniCluster {
 
 	private static final Logger LOG = LoggerFactory.getLogger(MiniCluster.class);
@@ -82,7 +78,7 @@ public class MiniCluster {
 	private HighAvailabilityServices haServices;
 
 	@GuardedBy("lock")
-	private ResourceManager<?>[] resourceManagers;
+	private ResourceManagerRunner[] resourceManagerRunners;
 
 	@GuardedBy("lock")
 	private TaskManagerRunner[] taskManagerRunners;
@@ -231,7 +227,7 @@ public class MiniCluster {
 
 				// bring up the ResourceManager(s)
 				LOG.info("Starting {} ResourceManger(s)", numResourceManagers);
-				resourceManagers = startResourceManagers(
+				resourceManagerRunners = startResourceManagers(
 						configuration, haServices, metricRegistry, numResourceManagers, resourceManagerRpcServices);
 
 				// bring up the TaskManager(s) for the mini cluster
@@ -303,8 +299,8 @@ public class MiniCluster {
 			jobDispatcher = null;
 		}
 
-		if (resourceManagers != null) {
-			for (ResourceManager<?> rm : resourceManagers) {
+		if (resourceManagerRunners != null) {
+			for (ResourceManagerRunner rm : resourceManagerRunners) {
 				if (rm != null) {
 					try {
 						rm.shutDown();
@@ -313,7 +309,7 @@ public class MiniCluster {
 					}
 				}
 			}
-			resourceManagers = null;
+			resourceManagerRunners = null;
 		}
 
 		// shut down the RpcServices
@@ -435,26 +431,27 @@ public class MiniCluster {
 		return new AkkaRpcService(actorSystem, askTimeout);
 	}
 
-	protected ResourceManager<?>[] startResourceManagers(
+	protected ResourceManagerRunner[] startResourceManagers(
 			Configuration configuration,
 			HighAvailabilityServices haServices,
 			MetricRegistry metricRegistry,
 			int numResourceManagers,
 			RpcService[] resourceManagerRpcServices) throws Exception {
 
-		final StandaloneResourceManager[] resourceManagers = new StandaloneResourceManager[numResourceManagers];
-		final SlotManagerFactory slotManagerFactory = new DefaultSlotManager.Factory(); 
+		final ResourceManagerRunner[] resourceManagerRunners = new ResourceManagerRunner[numResourceManagers];
 
 		for (int i = 0; i < numResourceManagers; i++) {
-			resourceManagers[i] = new StandaloneResourceManager(
-					resourceManagerRpcServices[i],
-					haServices,
-					slotManagerFactory);
 
-			resourceManagers[i].start();
+			resourceManagerRunners[i] = new ResourceManagerRunner(
+				configuration,
+				resourceManagerRpcServices[i],
+				haServices,
+				metricRegistry);
+
+			resourceManagerRunners[i].start();
 		}
 
-		return resourceManagers;
+		return resourceManagerRunners;
 	}
 
 	protected TaskManagerRunner[] startTaskManagers(

http://git-wip-us.apache.org/repos/asf/flink/blob/70af08c6/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index f1a5073..4161972 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -38,12 +38,15 @@ import org.apache.flink.runtime.leaderelection.LeaderContender;
 import org.apache.flink.runtime.leaderelection.LeaderElectionService;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException;
 import org.apache.flink.runtime.resourcemanager.messages.jobmanager.RMSlotRequestRejected;
 import org.apache.flink.runtime.resourcemanager.messages.jobmanager.RMSlotRequestReply;
 import org.apache.flink.runtime.resourcemanager.registration.JobMasterRegistration;
 import org.apache.flink.runtime.resourcemanager.registration.WorkerRegistration;
 import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
 import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerFactory;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcEndpoint;
 import org.apache.flink.runtime.rpc.RpcMethod;
 import org.apache.flink.runtime.rpc.RpcService;
@@ -54,6 +57,7 @@ import org.apache.flink.runtime.registration.RegistrationResponse;
 import org.apache.flink.runtime.taskexecutor.SlotReport;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorRegistrationSuccess;
+import org.apache.flink.util.ExceptionUtils;
 
 import java.io.Serializable;
 import java.util.HashMap;
@@ -61,6 +65,8 @@ import java.util.Iterator;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.Executor;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -79,8 +85,8 @@ public abstract class ResourceManager<WorkerType extends Serializable>
 		extends RpcEndpoint<ResourceManagerGateway>
 		implements LeaderContender {
 
-	/** The exit code with which the process is stopped in case of a fatal error. */
-	protected static final int EXIT_CODE_FATAL_ERROR = -13;
+	/** Configuration of the resource manager */
+	private final ResourceManagerConfiguration resourceManagerConfiguration;
 
 	/** All currently registered JobMasterGateways scoped by JobID. */
 	private final Map<JobID, JobMasterRegistration> jobMasters;
@@ -97,6 +103,12 @@ public abstract class ResourceManager<WorkerType extends Serializable>
 	/** The factory to construct the SlotManager. */
 	private final SlotManagerFactory slotManagerFactory;
 
+	/** Registry to use for metrics */
+	private final MetricRegistry metricRegistry;
+
+	/** Fatal error handler */
+	private final FatalErrorHandler fatalErrorHandler;
+
 	/** The SlotManager created by the slotManagerFactory when the ResourceManager is started. */
 	private SlotManager slotManager;
 
@@ -107,64 +119,89 @@ public abstract class ResourceManager<WorkerType extends Serializable>
 	private volatile UUID leaderSessionID;
 
 	/** All registered listeners for status updates of the ResourceManager. */
-	private Map<String, InfoMessageListenerRpcGateway> infoMessageListeners;
-
-	/** Default timeout for messages */
-	private final Time timeout = Time.seconds(5);
+	private ConcurrentMap<String, InfoMessageListenerRpcGateway> infoMessageListeners;
 
 	public ResourceManager(
 			RpcService rpcService,
+			ResourceManagerConfiguration resourceManagerConfiguration,
 			HighAvailabilityServices highAvailabilityServices,
-			SlotManagerFactory slotManagerFactory) {
+			SlotManagerFactory slotManagerFactory,
+			MetricRegistry metricRegistry,
+			FatalErrorHandler fatalErrorHandler) {
+
 		super(rpcService);
+
+		this.resourceManagerConfiguration = checkNotNull(resourceManagerConfiguration);
 		this.highAvailabilityServices = checkNotNull(highAvailabilityServices);
 		this.slotManagerFactory = checkNotNull(slotManagerFactory);
-		this.jobMasters = new HashMap<>();
-		this.leaderListeners = new HashMap<>();
-		this.taskExecutors = new HashMap<>();
-		this.leaderSessionID = new UUID(0, 0);
-		infoMessageListeners = new HashMap<>();
+		this.metricRegistry = checkNotNull(metricRegistry);
+		this.fatalErrorHandler = checkNotNull(fatalErrorHandler);
+
+		this.jobMasters = new HashMap<>(4);
+		this.leaderListeners = new HashMap<>(4);
+		this.taskExecutors = new HashMap<>(8);
+		this.leaderSessionID = null;
+		infoMessageListeners = new ConcurrentHashMap<>(8);
 	}
 
+	// ------------------------------------------------------------------------
+	//  RPC lifecycle methods
+	// ------------------------------------------------------------------------
+
 	@Override
-	public void start() {
+	public void start() throws Exception {
 		// start a leader
+		super.start();
+
 		try {
-			super.start();
 			// SlotManager should start first
 			slotManager = slotManagerFactory.create(createResourceManagerServices());
+		} catch (Exception e) {
+			throw new ResourceManagerException("Could not create the slot manager.", e);
+		}
+
+		try {
 			leaderElectionService = highAvailabilityServices.getResourceManagerLeaderElectionService();
 			leaderElectionService.start(this);
+		} catch (Exception e) {
+			throw new ResourceManagerException("Could not start the leader election service.", e);
+		}
+
+		try {
 			// framework specific initialization
 			initialize();
-		} catch (Throwable e) {
-			log.error("A fatal error happened when starting the ResourceManager", e);
-			throw new RuntimeException("A fatal error happened when starting the ResourceManager", e);
+		} catch (Exception e) {
+			throw new ResourceManagerException("Could not initialize the resource manager.", e);
 		}
 	}
 
 	@Override
-	public void shutDown() {
+	public void shutDown() throws Exception {
+		Exception exception = null;
+
 		try {
 			leaderElectionService.stop();
-			clearState();
+		} catch (Exception e) {
+			exception = ExceptionUtils.firstOrSuppressed(e, exception);
+		}
+
+		clearState();
+
+		try {
 			super.shutDown();
-		} catch (Throwable e) {
-			log.error("A fatal error happened when shutdown the ResourceManager", e);
-			throw new RuntimeException("A fatal error happened when shutdown the ResourceManager", e);
+		} catch (Exception e) {
+			exception = ExceptionUtils.firstOrSuppressed(e, exception);
 		}
-	}
 
-	/**
-	 * Gets the leader session id of current resourceManager.
-	 *
-	 * @return return the leaderSessionId of current resourceManager, this returns null until the current resourceManager is granted leadership.
-	 */
-	@VisibleForTesting
-	UUID getLeaderSessionID() {
-		return this.leaderSessionID;
+		if (exception != null) {
+			ExceptionUtils.rethrowException(exception, "Error while shutting the ResourceManager down.");
+		}
 	}
 
+	// ------------------------------------------------------------------------
+	//  RPC methods
+	// ------------------------------------------------------------------------
+
 	/**
 	 * Register a {@link JobMaster} at the resource manager.
 	 *
@@ -191,11 +228,12 @@ public abstract class ResourceManager<WorkerType extends Serializable>
 					highAvailabilityServices.getJobManagerLeaderRetriever(jobID);
 				jobIdLeaderListener = new JobIdLeaderListener(jobID, jobMasterLeaderRetriever);
 			} catch (Exception e) {
-				log.warn("Failed to start JobMasterLeaderRetriever for JobID {}", jobID);
-				FlinkCompletableFuture<RegistrationResponse> responseFuture = new FlinkCompletableFuture<>();
-				responseFuture.complete(new RegistrationResponse.Decline("Failed to retrieve JobMasterLeaderRetriever"));
-				return responseFuture;
+				log.warn("Failed to start JobMasterLeaderRetriever for job id {}", jobID, e);
+
+				return FlinkCompletableFuture.<RegistrationResponse>completed(
+					new RegistrationResponse.Decline("Failed to retrieve JobMasterLeaderRetriever"));
 			}
+
 			leaderListeners.put(jobID, jobIdLeaderListener);
 		}
 
@@ -211,6 +249,8 @@ public abstract class ResourceManager<WorkerType extends Serializable>
 						throw new Exception("Invalid leader session id");
 					}
 
+					final Time timeout = resourceManagerConfiguration.getTimeout();
+
 					if (!jobIdLeaderListener.getLeaderID().get(timeout.getSize(), timeout.getUnit())
 							.equals(jobMasterLeaderId)) {
 						throw new Exception("Leader Id did not match");
@@ -224,10 +264,9 @@ public abstract class ResourceManager<WorkerType extends Serializable>
 				@Override
 				public RegistrationResponse apply(JobMasterGateway jobMasterGateway, Throwable throwable) {
 
-					if (throwable != null) {
-						return new RegistrationResponse.Decline(throwable.getMessage());
-					}
-
+				if (throwable != null) {
+					return new RegistrationResponse.Decline(throwable.getMessage());
+				} else {
 					if (!leaderSessionID.equals(resourceManagerLeaderId)) {
 						log.warn("Discard registration from JobMaster {} at ({}) because the expected leader session ID {}" +
 								" did not equal the received leader session ID  {}",
@@ -252,10 +291,12 @@ public abstract class ResourceManager<WorkerType extends Serializable>
 					if (existingRegistration != null) {
 						log.info("Replacing JobMaster registration for newly registered JobMaster with JobID {}.", jobID);
 					}
-					return new JobMasterRegistrationSuccess(5000, resourceManagerLeaderId);
-
+					return new JobMasterRegistrationSuccess(
+						resourceManagerConfiguration.getHeartbeatInterval().toMilliseconds(),
+						resourceManagerLeaderId);
 				}
-			}, getMainThreadExecutor());
+			}
+		}, getMainThreadExecutor());
 	}
 
 	/**
@@ -274,38 +315,44 @@ public abstract class ResourceManager<WorkerType extends Serializable>
 		final ResourceID resourceID,
 		final SlotReport slotReport) {
 
-		return getRpcService().execute(new Callable<TaskExecutorGateway>() {
-			@Override
-			public TaskExecutorGateway call() throws Exception {
-				if (!leaderSessionID.equals(resourceManagerLeaderId)) {
-					log.warn("Discard registration from TaskExecutor {} at ({}) because the expected leader session ID {} did " +
-							"not equal the received leader session ID  {}",
-						resourceID, taskExecutorAddress, leaderSessionID, resourceManagerLeaderId);
-					throw new Exception("Invalid leader session id");
-				}
-				return getRpcService().connect(taskExecutorAddress, TaskExecutorGateway.class)
-					.get(timeout.toMilliseconds(), timeout.getUnit());
-			}
-		}).handleAsync(new BiFunction<TaskExecutorGateway, Throwable, RegistrationResponse>() {
-			@Override
-			public RegistrationResponse apply(TaskExecutorGateway taskExecutorGateway, Throwable throwable) {
-				if (throwable != null) {
-					return new RegistrationResponse.Decline(throwable.getMessage());
-				} else {
-					WorkerRegistration oldRegistration = taskExecutors.remove(resourceID);
-					if (oldRegistration != null) {
-						// TODO :: suggest old taskExecutor to stop itself
-						log.info("Replacing old instance of worker for ResourceID {}", resourceID);
+		if (leaderSessionID.equals(resourceManagerLeaderId)) {
+			Future<TaskExecutorGateway> taskExecutorGatewayFuture = getRpcService().connect(taskExecutorAddress, TaskExecutorGateway.class);
+
+			return taskExecutorGatewayFuture.handleAsync(new BiFunction<TaskExecutorGateway, Throwable, RegistrationResponse>() {
+				@Override
+				public RegistrationResponse apply(TaskExecutorGateway taskExecutorGateway, Throwable throwable) {
+					if (throwable != null) {
+						return new RegistrationResponse.Decline(throwable.getMessage());
+					} else {
+						WorkerRegistration<WorkerType> oldRegistration = taskExecutors.remove(resourceID);
+						if (oldRegistration != null) {
+							// TODO :: suggest old taskExecutor to stop itself
+							log.info("Replacing old instance of worker for ResourceID {}", resourceID);
+						}
+
+						WorkerType newWorker = workerStarted(resourceID);
+						WorkerRegistration<WorkerType> registration =
+							new WorkerRegistration<>(taskExecutorGateway, newWorker);
+
+						taskExecutors.put(resourceID, registration);
+						slotManager.registerTaskExecutor(resourceID, registration, slotReport);
+
+						return new TaskExecutorRegistrationSuccess(
+							registration.getInstanceID(),
+							resourceManagerConfiguration.getHeartbeatInterval().toMilliseconds());
 					}
-					WorkerType newWorker = workerStarted(resourceID);
-					WorkerRegistration<WorkerType> registration =
-						new WorkerRegistration<>(taskExecutorGateway, newWorker);
-					taskExecutors.put(resourceID, registration);
-					slotManager.registerTaskExecutor(resourceID, registration, slotReport);
-					return new TaskExecutorRegistrationSuccess(registration.getInstanceID(), 5000);
 				}
-			}
-		}, getMainThreadExecutor());
+			}, getMainThreadExecutor());
+		} else {
+			log.warn("Discard registration from TaskExecutor {} at ({}) because the expected leader session ID {} did " +
+					"not equal the received leader session ID  {}",
+				resourceID, taskExecutorAddress, leaderSessionID, resourceManagerLeaderId);
+
+			return FlinkCompletableFuture.<RegistrationResponse>completed(
+				new RegistrationResponse.Decline("Discard registration because the leader id " +
+					resourceManagerLeaderId + " does not match the expected leader id " +
+					leaderSessionID + '.'));
+		}
 	}
 
 	/**
@@ -337,33 +384,91 @@ public abstract class ResourceManager<WorkerType extends Serializable>
 	 * Notification from a TaskExecutor that a slot has become available
 	 * @param resourceManagerLeaderId TaskExecutor's resource manager leader id
 	 * @param instanceID TaskExecutor's instance id
-	 * @param slotID The slot id of the available slot
+	 * @param slotId The slot id of the available slot
 	 * @return SlotAvailableReply
 	 */
 	@RpcMethod
 	public void notifySlotAvailable(
 			final UUID resourceManagerLeaderId,
 			final InstanceID instanceID,
-			final SlotID slotID) {
+			final SlotID slotId) {
 
 		if (resourceManagerLeaderId.equals(leaderSessionID)) {
-			final ResourceID resourceId = slotID.getResourceID();
+			final ResourceID resourceId = slotId.getResourceID();
 			WorkerRegistration<WorkerType> registration = taskExecutors.get(resourceId);
 
 			if (registration != null) {
-				InstanceID registrationInstanceID = registration.getInstanceID();
-				if (registrationInstanceID.equals(instanceID)) {
-					runAsync(new Runnable() {
-						@Override
-						public void run() {
-							slotManager.notifySlotAvailable(resourceId, slotID);
-						}
-					});
+				InstanceID registrationId = registration.getInstanceID();
+
+				if (registrationId.equals(instanceID)) {
+					slotManager.notifySlotAvailable(resourceId, slotId);
+				} else {
+					log.debug("Invalid registration id for slot available message. This indicates an" +
+						" outdated request.");
 				}
+			} else {
+				log.debug("Could not find registration for resource id {}. Discarding the slot available" +
+					"message {}.", resourceId, slotId);
 			}
+		} else {
+			log.debug("Discarding notify slot available message for slot {}, because the " +
+				"leader id {} did not match the expected leader id {}.", slotId,
+				resourceManagerLeaderId, leaderSessionID);
+		}
+	}
+
+	/**
+	 * Registers an info message listener
+	 *
+	 * @param address address of infoMessage listener to register to this resource manager
+	 */
+	@RpcMethod
+	public void registerInfoMessageListener(final String address) {
+		if(infoMessageListeners.containsKey(address)) {
+			log.warn("Receive a duplicate registration from info message listener on ({})", address);
+		} else {
+			Future<InfoMessageListenerRpcGateway> infoMessageListenerRpcGatewayFuture = getRpcService().connect(address, InfoMessageListenerRpcGateway.class);
+
+			infoMessageListenerRpcGatewayFuture.thenAcceptAsync(new AcceptFunction<InfoMessageListenerRpcGateway>() {
+				@Override
+				public void accept(InfoMessageListenerRpcGateway gateway) {
+					log.info("Receive a registration from info message listener on ({})", address);
+					infoMessageListeners.put(address, gateway);
+				}
+			}, getMainThreadExecutor());
+
+			infoMessageListenerRpcGatewayFuture.exceptionallyAsync(new ApplyFunction<Throwable, Void>() {
+				@Override
+				public Void apply(Throwable failure) {
+					log.warn("Receive a registration from unreachable info message listener on ({})", address);
+					return null;
+				}
+			}, getRpcService().getExecutor());
 		}
 	}
 
+	/**
+	 * Unregisters an info message listener
+	 *
+	 * @param address of the  info message listener to unregister from this resource manager
+	 *
+	 */
+	@RpcMethod
+	public void unRegisterInfoMessageListener(final String address) {
+		infoMessageListeners.remove(address);
+	}
+
+	/**
+	 * Cleanup application and shut down cluster
+	 *
+	 * @param finalStatus
+	 * @param optionalDiagnostics
+	 */
+	@RpcMethod
+	public void shutDownCluster(final ApplicationStatus finalStatus, final String optionalDiagnostics) {
+		log.info("shut down cluster because application is in {}, diagnostics {}", finalStatus, optionalDiagnostics);
+		shutDownApplication(finalStatus, optionalDiagnostics);
+	}
 
 	// ------------------------------------------------------------------------
 	//  Leader Contender
@@ -372,23 +477,35 @@ public abstract class ResourceManager<WorkerType extends Serializable>
 	/**
 	 * Callback method when current resourceManager is granted leadership
 	 *
-	 * @param leaderSessionID unique leadershipID
+	 * @param newLeaderSessionID unique leadershipID
 	 */
 	@Override
-	public void grantLeadership(final UUID leaderSessionID) {
+	public void grantLeadership(final UUID newLeaderSessionID) {
 		runAsync(new Runnable() {
 			@Override
 			public void run() {
-				log.info("ResourceManager {} was granted leadership with leader session ID {}", getAddress(), leaderSessionID);
-				// confirming the leader session ID might be blocking,
-				leaderElectionService.confirmLeaderSessionID(leaderSessionID);
-				ResourceManager.this.leaderSessionID = leaderSessionID;
+				log.info("ResourceManager {} was granted leadership with leader session ID {}", getAddress(), newLeaderSessionID);
+
+				// clear the state if we've been the leader before
+				if (leaderSessionID != null) {
+					clearState();
+				}
+
+				leaderSessionID = newLeaderSessionID;
+
+				getRpcService().execute(new Runnable() {
+					@Override
+					public void run() {
+						// confirming the leader session ID might be blocking,
+						leaderElectionService.confirmLeaderSessionID(newLeaderSessionID);
+					}
+				});
 			}
 		});
 	}
 
 	/**
-	 * Callback method when current resourceManager lose leadership.
+	 * Callback method when current resourceManager loses leadership.
 	 */
 	@Override
 	public void revokeLeadership() {
@@ -396,7 +513,10 @@ public abstract class ResourceManager<WorkerType extends Serializable>
 			@Override
 			public void run() {
 				log.info("ResourceManager {} was revoked leadership.", getAddress());
+
 				clearState();
+
+				leaderSessionID = null;
 			}
 		});
 	}
@@ -408,106 +528,98 @@ public abstract class ResourceManager<WorkerType extends Serializable>
 	 */
 	@Override
 	public void handleError(final Exception exception) {
-		log.error("ResourceManager received an error from the LeaderElectionService.", exception);
-		// terminate ResourceManager in case of an error
-		shutDown();
+		onFatalErrorAsync(new ResourceManagerException("Received an error from the LeaderElectionService.", exception));
 	}
 
 	/**
-	 * Registers an infoMessage listener
+	 * This method should be called by the framework once it detects that a currently registered
+	 * task executor has failed.
 	 *
-	 * @param infoMessageListenerAddress address of infoMessage listener to register to this resource manager
+	 * @param resourceID Id of the worker that has failed.
+	 * @param message An informational message that explains why the worker failed.
 	 */
-	@RpcMethod
-	public void registerInfoMessageListener(final String infoMessageListenerAddress) {
-		if(infoMessageListeners.containsKey(infoMessageListenerAddress)) {
-			log.warn("Receive a duplicate registration from info message listener on ({})", infoMessageListenerAddress);
-		} else {
-			Future<InfoMessageListenerRpcGateway> infoMessageListenerRpcGatewayFuture = getRpcService().connect(infoMessageListenerAddress, InfoMessageListenerRpcGateway.class);
-
-			infoMessageListenerRpcGatewayFuture.thenAcceptAsync(new AcceptFunction<InfoMessageListenerRpcGateway>() {
-				@Override
-				public void accept(InfoMessageListenerRpcGateway gateway) {
-					log.info("Receive a registration from info message listener on ({})", infoMessageListenerAddress);
-					infoMessageListeners.put(infoMessageListenerAddress, gateway);
-				}
-			}, getMainThreadExecutor());
+	public void notifyWorkerFailed(final ResourceID resourceID, final String message) {
+		runAsync(new Runnable() {
+			@Override
+			public void run() {
+				WorkerRegistration<WorkerType> workerRegistration = taskExecutors.remove(resourceID);
 
-			infoMessageListenerRpcGatewayFuture.exceptionallyAsync(new ApplyFunction<Throwable, Void>() {
-				@Override
-				public Void apply(Throwable failure) {
-					log.warn("Receive a registration from unreachable info message listener on ({})", infoMessageListenerAddress);
-					return null;
+				if (workerRegistration != null) {
+					log.info("Task manager {} failed because {}.", resourceID, message);
+					// TODO :: suggest failed task executor to stop itself
+					slotManager.notifyTaskManagerFailure(resourceID);
+				} else {
+					log.debug("Could not find a registered task manager with the process id {}.", resourceID);
 				}
-			}, getMainThreadExecutor());
-		}
-	}
-
-	/**
-	 * Unregisters an infoMessage listener
-	 *
-	 * @param infoMessageListenerAddress address of infoMessage listener to unregister from this resource manager
-	 *
-	 */
-	@RpcMethod
-	public void unRegisterInfoMessageListener(final String infoMessageListenerAddress) {
-		infoMessageListeners.remove(infoMessageListenerAddress);
+			}
+		});
 	}
 
-	/**
-	 * Cleanup application and shut down cluster
-	 *
-	 * @param finalStatus
-	 * @param optionalDiagnostics
-	 */
-	@RpcMethod
-	public void shutDownCluster(final ApplicationStatus finalStatus, final String optionalDiagnostics) {
-		log.info("shut down cluster because application is in {}, diagnostics {}", finalStatus, optionalDiagnostics);
-		shutDownApplication(finalStatus, optionalDiagnostics);
-	}
+	// ------------------------------------------------------------------------
+	//  Error Handling
+	// ------------------------------------------------------------------------
 
 	/**
-	 * This method should be called by the framework once it detects that a currently registered task executor has failed.
+	 * Notifies the ResourceManager that a fatal error has occurred and it cannot proceed.
+	 * This method should be used when asynchronous threads want to notify the
+	 * ResourceManager of a fatal error.
 	 *
-	 * @param resourceID Id of the worker that has failed.
-	 * @param message An informational message that explains why the worker failed.
+	 * @param t The exception describing the fatal error
 	 */
-	public void notifyWorkerFailed(final ResourceID resourceID, String message) {
+	void onFatalErrorAsync(final Throwable t) {
 		runAsync(new Runnable() {
 			@Override
 			public void run() {
-				WorkerType worker = taskExecutors.remove(resourceID).getWorker();
-				if (worker != null) {
-					// TODO :: suggest failed task executor to stop itself
-					slotManager.notifyTaskManagerFailure(resourceID);
-				}
+				onFatalError(t);
 			}
 		});
 	}
 
 	/**
-	 * Gets the number of currently started TaskManagers.
+	 * Notifies the ResourceManager that a fatal error has occurred and it cannot proceed.
+	 * This method must only be called from within the ResourceManager's main thread.
 	 *
-	 * @return The number of currently started TaskManagers.
+	 * @param t The exception describing the fatal error
 	 */
-	public int getNumberOfStartedTaskManagers() {
-		return taskExecutors.size();
+	void onFatalError(Throwable t) {
+		log.error("Fatal error occurred.", t);
+		fatalErrorHandler.onFatalError(t);
 	}
 
+	// ------------------------------------------------------------------------
+	//  Testing methods
+	// ------------------------------------------------------------------------
+
 	/**
-	 * Notifies the resource manager of a fatal error.
+	 * Gets the leader session id of current resourceManager.
 	 *
-	 * <p><b>IMPORTANT:</b> This should not cleanly shut down this master, but exit it in
-	 * such a way that a high-availability setting would restart this or fail over
-	 * to another master.
+	 * @return return the leaderSessionId of current resourceManager, this returns null until the current resourceManager is granted leadership.
 	 */
-	public void onFatalError(final String message, final Throwable error) {
-		runAsync(new Runnable() {
-			@Override
-			public void run() {
-				fatalError(message, error);
+	@VisibleForTesting
+	UUID getLeaderSessionID() {
+		return leaderSessionID;
+	}
+
+	// ------------------------------------------------------------------------
+	//  Internal methods
+	// ------------------------------------------------------------------------
+
+	private void clearState() {
+		jobMasters.clear();
+		taskExecutors.clear();
+		slotManager.clearState();
+		Iterator<JobIdLeaderListener> leaderListenerIterator =
+			leaderListeners.values().iterator();
+		while (leaderListenerIterator.hasNext()) {
+			JobIdLeaderListener listener = leaderListenerIterator.next();
+			try {
+				listener.stopService();
+			} catch (Exception e) {
+				onFatalError(e);
 			}
-		});
+			leaderListenerIterator.remove();
+		}
+		leaderSessionID = new UUID(0, 0);
 	}
 
 	// ------------------------------------------------------------------------
@@ -522,15 +634,6 @@ public abstract class ResourceManager<WorkerType extends Serializable>
 	protected abstract void initialize() throws Exception;
 
 	/**
-	 * Notifies the resource master of a fatal error.
-	 *
-	 * <p><b>IMPORTANT:</b> This should not cleanly shut down this master, but exit it in
-	 * such a way that a high-availability setting would restart this or fail over
-	 * to another master.
-	 */
-	protected abstract void fatalError(String message, Throwable error);
-
-	/**
 	 * The framework specific code for shutting down the application. This should report the
 	 * application's final status and shut down the resource manager cleanly.
 	 *
@@ -560,7 +663,7 @@ public abstract class ResourceManager<WorkerType extends Serializable>
 	// ------------------------------------------------------------------------
 
 	public void sendInfoMessage(final String message) {
-		runAsync(new Runnable() {
+		getRpcService().execute(new Runnable() {
 			@Override
 			public void run() {
 				InfoMessage infoMessage = new InfoMessage(message);
@@ -675,23 +778,5 @@ public abstract class ResourceManager<WorkerType extends Serializable>
 			ResourceManager.this.handleError(exception);
 		}
 	}
-
-	private void clearState() {
-		jobMasters.clear();
-		taskExecutors.clear();
-		slotManager.clearState();
-		Iterator<JobIdLeaderListener> leaderListenerIterator =
-			leaderListeners.values().iterator();
-		while (leaderListenerIterator.hasNext()) {
-			JobIdLeaderListener listener = leaderListenerIterator.next();
-			try {
-				listener.stopService();
-			} catch (Exception e) {
-				handleError(e);
-			}
-			leaderListenerIterator.remove();
-		}
-		leaderSessionID = new UUID(0, 0);
-	}
 }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/70af08c6/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerConfiguration.java
new file mode 100644
index 0000000..920f1fc
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerConfiguration.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.resourcemanager.exceptions.ConfigurationException;
+import org.apache.flink.util.Preconditions;
+import scala.concurrent.duration.Duration;
+
+/**
+ * Resource manager configuration
+ */
+public class ResourceManagerConfiguration {
+
+	private final Time timeout;
+	private final Time heartbeatInterval;
+
+	public ResourceManagerConfiguration(Time timeout, Time heartbeatInterval) {
+		this.timeout = Preconditions.checkNotNull(timeout);
+		this.heartbeatInterval = Preconditions.checkNotNull(heartbeatInterval);
+	}
+
+	public Time getTimeout() {
+		return timeout;
+	}
+
+	public Time getHeartbeatInterval() {
+		return heartbeatInterval;
+	}
+
+	// --------------------------------------------------------------------------
+	// Static factory methods
+	// --------------------------------------------------------------------------
+
+	public static ResourceManagerConfiguration fromConfiguration(Configuration configuration) throws ConfigurationException {
+		ConfigOption<String> timeoutOption = ConfigOptions
+			.key(ConfigConstants.AKKA_ASK_TIMEOUT)
+			.defaultValue(ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT);
+
+		final String strTimeout = configuration.getString(timeoutOption);
+		final Time timeout;
+
+		try {
+			timeout = Time.milliseconds(Duration.apply(strTimeout).toMillis());
+		} catch (NumberFormatException e) {
+			throw new ConfigurationException("Could not parse the resource manager's timeout " +
+				"value " + timeoutOption + '.', e);
+		}
+
+		ConfigOption<String> heartbeatIntervalOption = ConfigOptions
+			.key(ConfigConstants.AKKA_WATCH_HEARTBEAT_INTERVAL)
+			.defaultValue(timeout.toString());
+
+		final String strHeartbeatInterval = configuration.getString(heartbeatIntervalOption);
+		final Time heartbeatInterval;
+
+		try {
+			heartbeatInterval = Time.milliseconds(Duration.apply(strHeartbeatInterval).toMillis());
+		} catch (NumberFormatException e) {
+			throw new ConfigurationException("Could not parse the resource manager's heartbeat interval " +
+				"value " + timeoutOption + '.', e);
+		}
+
+		return new ResourceManagerConfiguration(timeout, heartbeatInterval);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/70af08c6/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java
index f9f55f8..926be0d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java
@@ -22,7 +22,9 @@ import org.apache.flink.runtime.clusterframework.ApplicationStatus;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerFactory;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
 
 /**
@@ -33,10 +35,20 @@ import org.apache.flink.runtime.rpc.RpcService;
  */
 public class StandaloneResourceManager extends ResourceManager<ResourceID> {
 
-	public StandaloneResourceManager(RpcService rpcService,
+	public StandaloneResourceManager(
+			RpcService rpcService,
+			ResourceManagerConfiguration resourceManagerConfiguration,
 			HighAvailabilityServices highAvailabilityServices,
-			SlotManagerFactory slotManagerFactory) {
-		super(rpcService, highAvailabilityServices, slotManagerFactory);
+			SlotManagerFactory slotManagerFactory,
+			MetricRegistry metricRegistry,
+			FatalErrorHandler fatalErrorHandler) {
+		super(
+			rpcService,
+			resourceManagerConfiguration,
+			highAvailabilityServices,
+			slotManagerFactory,
+			metricRegistry,
+			fatalErrorHandler);
 	}
 
 	@Override
@@ -45,13 +57,6 @@ public class StandaloneResourceManager extends ResourceManager<ResourceID> {
 	}
 
 	@Override
-	protected void fatalError(final String message, final Throwable error) {
-		log.error("FATAL ERROR IN RESOURCE MANAGER: " + message, error);
-		// kill this process
-		System.exit(EXIT_CODE_FATAL_ERROR);
-	}
-
-	@Override
 	protected void shutDownApplication(ApplicationStatus finalStatus, String optionalDiagnostics) {
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/70af08c6/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/exceptions/ConfigurationException.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/exceptions/ConfigurationException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/exceptions/ConfigurationException.java
new file mode 100644
index 0000000..f081fff
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/exceptions/ConfigurationException.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager.exceptions;
+
+/**
+ * Base class for configuration related exception which occur when creating a configuration.
+ */
+public class ConfigurationException extends Exception {
+	private static final long serialVersionUID = 3971647332059381556L;
+
+	public ConfigurationException(String message) {
+		super(message);
+	}
+
+	public ConfigurationException(String message, Throwable cause) {
+		super(message, cause);
+	}
+
+	public ConfigurationException(Throwable cause) {
+		super(cause);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/70af08c6/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/exceptions/ResourceManagerException.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/exceptions/ResourceManagerException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/exceptions/ResourceManagerException.java
new file mode 100644
index 0000000..6b4d646
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/exceptions/ResourceManagerException.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager.exceptions;
+
+import org.apache.flink.runtime.resourcemanager.ResourceManager;
+
+/**
+ * Base class for {@link ResourceManager} exceptions.
+ */
+public class ResourceManagerException extends Exception {
+	private static final long serialVersionUID = -5503307426519195160L;
+
+	public ResourceManagerException(String message) {
+		super(message);
+	}
+
+	public ResourceManagerException(String message, Throwable cause) {
+		super(message, cause);
+	}
+
+	public ResourceManagerException(Throwable cause) {
+		super(cause);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/70af08c6/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/exceptions/ResourceManagerRunner.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/exceptions/ResourceManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/exceptions/ResourceManagerRunner.java
new file mode 100644
index 0000000..1e6f04c
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/exceptions/ResourceManagerRunner.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager.exceptions;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.resourcemanager.ResourceManager;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
+import org.apache.flink.runtime.resourcemanager.StandaloneResourceManager;
+import org.apache.flink.runtime.resourcemanager.slotmanager.DefaultSlotManager;
+import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerFactory;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Simple {@link StandaloneResourceManager} runner. It instantiates the resource manager's services
+ * and handles fatal errors by shutting the resource manager down.
+ */
+public class ResourceManagerRunner implements FatalErrorHandler {
+
+	private static final Logger LOG = LoggerFactory.getLogger(ResourceManagerRunner.class);
+
+	private final Object lock = new Object();
+
+	private final ResourceManager<?> resourceManager;
+
+	public ResourceManagerRunner(
+			final Configuration configuration,
+			final RpcService rpcService,
+			final HighAvailabilityServices highAvailabilityServices,
+			final MetricRegistry metricRegistry) throws ConfigurationException {
+
+		Preconditions.checkNotNull(configuration);
+		Preconditions.checkNotNull(rpcService);
+		Preconditions.checkNotNull(highAvailabilityServices);
+		Preconditions.checkNotNull(metricRegistry);
+
+		final ResourceManagerConfiguration resourceManagerConfiguration = ResourceManagerConfiguration.fromConfiguration(configuration);
+		final SlotManagerFactory slotManagerFactory = new DefaultSlotManager.Factory();
+
+		this.resourceManager = new StandaloneResourceManager(
+			rpcService,
+			resourceManagerConfiguration,
+			highAvailabilityServices,
+			slotManagerFactory,
+			metricRegistry,
+			this);
+	}
+
+	//-------------------------------------------------------------------------------------
+	// Lifecycle management
+	//-------------------------------------------------------------------------------------
+
+	public void start() throws Exception {
+		resourceManager.start();
+	}
+
+	public void shutDown() throws Exception {
+		shutDownInternally();
+	}
+
+	private void shutDownInternally() throws Exception {
+		synchronized (lock) {
+			resourceManager.shutDown();
+		}
+	}
+
+	//-------------------------------------------------------------------------------------
+	// Fatal error handler
+	//-------------------------------------------------------------------------------------
+
+	@Override
+	public void onFatalError(Throwable exception) {
+		LOG.error("Encountered fatal error.", exception);
+
+		try {
+			shutDownInternally();
+		} catch (Exception e) {
+			LOG.error("Could not properly shut down the resource manager.", e);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/70af08c6/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/registration/WorkerRegistration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/registration/WorkerRegistration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/registration/WorkerRegistration.java
index ff28f94..7ee7a1f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/registration/WorkerRegistration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/registration/WorkerRegistration.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.resourcemanager.registration;
 
 import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
+import org.apache.flink.util.Preconditions;
 
 import java.io.Serializable;
 
@@ -29,11 +30,11 @@ public class WorkerRegistration<WorkerType extends Serializable> extends TaskExe
 
 	private static final long serialVersionUID = -2062957799469434614L;
 
-	private WorkerType worker;
+	private final WorkerType worker;
 
 	public WorkerRegistration(TaskExecutorGateway taskExecutorGateway, WorkerType worker) {
 		super(taskExecutorGateway);
-		this.worker = worker;
+		this.worker = Preconditions.checkNotNull(worker);
 	}
 
 	public WorkerType getWorker() {

http://git-wip-us.apache.org/repos/asf/flink/blob/70af08c6/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java
index ce1fdca..cb38e6e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java
@@ -18,24 +18,20 @@
 
 package org.apache.flink.runtime.resourcemanager;
 
+import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
 import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
-import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
+import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerFactory;
-import org.apache.flink.runtime.rpc.MainThreadExecutable;
-import org.apache.flink.runtime.rpc.RpcEndpoint;
-import org.apache.flink.runtime.rpc.RpcGateway;
 import org.apache.flink.runtime.rpc.RpcService;
-import org.apache.flink.runtime.rpc.StartStoppable;
+import org.apache.flink.runtime.rpc.TestingSerialRpcService;
+import org.apache.flink.runtime.util.TestingFatalErrorHandler;
 import org.junit.Assert;
 import org.junit.Test;
 
 import java.util.UUID;
 
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.doCallRealMethod;
 import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
 
 /**
  * resourceManager HA test, including grant leadership and revoke leadership
@@ -44,37 +40,38 @@ public class ResourceManagerHATest {
 
 	@Test
 	public void testGrantAndRevokeLeadership() throws Exception {
-		// mock a RpcService which will return a special RpcGateway when call its startServer method,
-		// the returned RpcGateway directly executes runAsync calls
-		TestingResourceManagerGatewayProxy gateway = mock(TestingResourceManagerGatewayProxy.class);
-		doCallRealMethod().when(gateway).runAsync(any(Runnable.class));
-
-		RpcService rpcService = mock(RpcService.class);
-		when(rpcService.startServer(any(RpcEndpoint.class))).thenReturn(gateway);
+		RpcService rpcService = new TestingSerialRpcService();
 
 		TestingLeaderElectionService leaderElectionService = new TestingLeaderElectionService();
 		TestingHighAvailabilityServices highAvailabilityServices = new TestingHighAvailabilityServices();
 		highAvailabilityServices.setResourceManagerLeaderElectionService(leaderElectionService);
 
+		ResourceManagerConfiguration resourceManagerConfiguration = new ResourceManagerConfiguration(Time.seconds(5L), Time.seconds(5L));
+		SlotManagerFactory slotManagerFactory = new TestingSlotManagerFactory();
+		MetricRegistry metricRegistry = mock(MetricRegistry.class);
+		TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler();
+
 		final ResourceManager resourceManager =
-			new TestingResourceManager(rpcService, highAvailabilityServices);
+			new StandaloneResourceManager(
+				rpcService,
+				resourceManagerConfiguration,
+				highAvailabilityServices,
+				slotManagerFactory,
+				metricRegistry,
+				testingFatalErrorHandler);
 		resourceManager.start();
 		// before grant leadership, resourceManager's leaderId is null
-		Assert.assertEquals(new UUID(0,0), resourceManager.getLeaderSessionID());
+		Assert.assertEquals(null, resourceManager.getLeaderSessionID());
 		final UUID leaderId = UUID.randomUUID();
 		leaderElectionService.isLeader(leaderId);
 		// after grant leadership, resourceManager's leaderId has value
 		Assert.assertEquals(leaderId, resourceManager.getLeaderSessionID());
 		// then revoke leadership, resourceManager's leaderId is null again
 		leaderElectionService.notLeader();
-		Assert.assertEquals(new UUID(0,0), resourceManager.getLeaderSessionID());
-	}
+		Assert.assertEquals(null, resourceManager.getLeaderSessionID());
 
-	private static abstract class TestingResourceManagerGatewayProxy implements MainThreadExecutable, StartStoppable, RpcGateway {
-		@Override
-		public void runAsync(Runnable runnable) {
-			runnable.run();
+		if (testingFatalErrorHandler.hasExceptionOccurred()) {
+			testingFatalErrorHandler.rethrowError();
 		}
 	}
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/70af08c6/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java
index 14afd0e..7b8d254 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java
@@ -19,14 +19,19 @@
 package org.apache.flink.runtime.resourcemanager;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
 import org.apache.flink.runtime.jobmaster.JobMasterGateway;
 import org.apache.flink.runtime.jobmaster.JobMasterRegistrationSuccess;
 import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
 import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerFactory;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.TestingSerialRpcService;
 import org.apache.flink.runtime.registration.RegistrationResponse;
+import org.apache.flink.runtime.util.TestingFatalErrorHandler;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -61,13 +66,18 @@ public class ResourceManagerJobMasterTest {
 		TestingLeaderElectionService resourceManagerLeaderElectionService = new TestingLeaderElectionService();
 		UUID jmLeaderID = UUID.randomUUID();
 		TestingLeaderRetrievalService jobMasterLeaderRetrievalService = new TestingLeaderRetrievalService(jobMasterAddress, jmLeaderID);
-		final ResourceManager resourceManager = createAndStartResourceManager(resourceManagerLeaderElectionService, jobID, jobMasterLeaderRetrievalService);
+		TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler();
+		final ResourceManager resourceManager = createAndStartResourceManager(resourceManagerLeaderElectionService, jobID, jobMasterLeaderRetrievalService, testingFatalErrorHandler);
 		final UUID rmLeaderSessionId = grantResourceManagerLeadership(resourceManagerLeaderElectionService);
 
 		// test response successful
 		Future<RegistrationResponse> successfulFuture = resourceManager.registerJobMaster(rmLeaderSessionId, jmLeaderID, jobMasterAddress, jobID);
 		RegistrationResponse response = successfulFuture.get(5, TimeUnit.SECONDS);
 		assertTrue(response instanceof JobMasterRegistrationSuccess);
+
+		if (testingFatalErrorHandler.hasExceptionOccurred()) {
+			testingFatalErrorHandler.rethrowError();
+		}
 	}
 
 	/**
@@ -80,13 +90,18 @@ public class ResourceManagerJobMasterTest {
 		TestingLeaderElectionService resourceManagerLeaderElectionService = new TestingLeaderElectionService();
 		UUID jmLeaderID = UUID.randomUUID();
 		TestingLeaderRetrievalService jobMasterLeaderRetrievalService = new TestingLeaderRetrievalService(jobMasterAddress, jmLeaderID);
-		final ResourceManager resourceManager = createAndStartResourceManager(resourceManagerLeaderElectionService, jobID, jobMasterLeaderRetrievalService);
+		TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler();
+		final ResourceManager resourceManager = createAndStartResourceManager(resourceManagerLeaderElectionService, jobID, jobMasterLeaderRetrievalService, testingFatalErrorHandler);
 		final UUID rmLeaderSessionId = grantResourceManagerLeadership(resourceManagerLeaderElectionService);
 
 		// test throw exception when receive a registration from job master which takes unmatched leaderSessionId
 		UUID differentLeaderSessionID = UUID.randomUUID();
 		Future<RegistrationResponse> unMatchedLeaderFuture = resourceManager.registerJobMaster(differentLeaderSessionID, jmLeaderID, jobMasterAddress, jobID);
 		assertTrue(unMatchedLeaderFuture.get(5, TimeUnit.SECONDS) instanceof RegistrationResponse.Decline);
+
+		if (testingFatalErrorHandler.hasExceptionOccurred()) {
+			testingFatalErrorHandler.rethrowError();
+		}
 	}
 
 	/**
@@ -98,7 +113,8 @@ public class ResourceManagerJobMasterTest {
 		JobID jobID = mockJobMaster(jobMasterAddress);
 		TestingLeaderElectionService resourceManagerLeaderElectionService = new TestingLeaderElectionService();
 		TestingLeaderRetrievalService jobMasterLeaderRetrievalService = new TestingLeaderRetrievalService();
-		final ResourceManager resourceManager = createAndStartResourceManager(resourceManagerLeaderElectionService, jobID, jobMasterLeaderRetrievalService);
+		TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler();
+		final ResourceManager resourceManager = createAndStartResourceManager(resourceManagerLeaderElectionService, jobID, jobMasterLeaderRetrievalService, testingFatalErrorHandler);
 		final UUID rmLeaderSessionId = grantResourceManagerLeadership(resourceManagerLeaderElectionService);
 		final UUID jmLeaderSessionId = grantResourceManagerLeadership(resourceManagerLeaderElectionService);
 
@@ -106,6 +122,10 @@ public class ResourceManagerJobMasterTest {
 		UUID differentLeaderSessionID = UUID.randomUUID();
 		Future<RegistrationResponse> unMatchedLeaderFuture = resourceManager.registerJobMaster(rmLeaderSessionId, differentLeaderSessionID, jobMasterAddress, jobID);
 		assertTrue(unMatchedLeaderFuture.get(5, TimeUnit.SECONDS) instanceof RegistrationResponse.Decline);
+
+		if (testingFatalErrorHandler.hasExceptionOccurred()) {
+			testingFatalErrorHandler.rethrowError();
+		}
 	}
 
 	/**
@@ -117,7 +137,8 @@ public class ResourceManagerJobMasterTest {
 		JobID jobID = mockJobMaster(jobMasterAddress);
 		TestingLeaderElectionService resourceManagerLeaderElectionService = new TestingLeaderElectionService();
 		TestingLeaderRetrievalService jobMasterLeaderRetrievalService = new TestingLeaderRetrievalService();
-		final ResourceManager resourceManager = createAndStartResourceManager(resourceManagerLeaderElectionService, jobID, jobMasterLeaderRetrievalService);
+		TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler();
+		final ResourceManager resourceManager = createAndStartResourceManager(resourceManagerLeaderElectionService, jobID, jobMasterLeaderRetrievalService, testingFatalErrorHandler);
 		final UUID rmLeaderSessionId = grantResourceManagerLeadership(resourceManagerLeaderElectionService);
 		final UUID jmLeaderSessionId = grantResourceManagerLeadership(resourceManagerLeaderElectionService);
 
@@ -125,6 +146,10 @@ public class ResourceManagerJobMasterTest {
 		String invalidAddress = "/jobMasterAddress2";
 		Future<RegistrationResponse> invalidAddressFuture = resourceManager.registerJobMaster(rmLeaderSessionId, jmLeaderSessionId, invalidAddress, jobID);
 		assertTrue(invalidAddressFuture.get(5, TimeUnit.SECONDS) instanceof RegistrationResponse.Decline);
+
+		if (testingFatalErrorHandler.hasExceptionOccurred()) {
+			testingFatalErrorHandler.rethrowError();
+		}
 	}
 
 	/**
@@ -136,7 +161,8 @@ public class ResourceManagerJobMasterTest {
 		JobID jobID = mockJobMaster(jobMasterAddress);
 		TestingLeaderElectionService resourceManagerLeaderElectionService = new TestingLeaderElectionService();
 		TestingLeaderRetrievalService jobMasterLeaderRetrievalService = new TestingLeaderRetrievalService();
-		final ResourceManager resourceManager = createAndStartResourceManager(resourceManagerLeaderElectionService, jobID, jobMasterLeaderRetrievalService);
+		TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler();
+		final ResourceManager resourceManager = createAndStartResourceManager(resourceManagerLeaderElectionService, jobID, jobMasterLeaderRetrievalService, testingFatalErrorHandler);
 		final UUID rmLeaderSessionId = grantResourceManagerLeadership(resourceManagerLeaderElectionService);
 		final UUID jmLeaderSessionId = grantResourceManagerLeadership(resourceManagerLeaderElectionService);
 
@@ -145,6 +171,10 @@ public class ResourceManagerJobMasterTest {
 		Future<RegistrationResponse> declineFuture = resourceManager.registerJobMaster(rmLeaderSessionId, jmLeaderSessionId, jobMasterAddress, unknownJobIDToHAServices);
 		RegistrationResponse response = declineFuture.get(5, TimeUnit.SECONDS);
 		assertTrue(response instanceof RegistrationResponse.Decline);
+
+		if (testingFatalErrorHandler.hasExceptionOccurred()) {
+			testingFatalErrorHandler.rethrowError();
+		}
 	}
 
 	private JobID mockJobMaster(String jobMasterAddress) {
@@ -154,11 +184,26 @@ public class ResourceManagerJobMasterTest {
 		return jobID;
 	}
 
-	private ResourceManager createAndStartResourceManager(TestingLeaderElectionService resourceManagerLeaderElectionService, JobID jobID, TestingLeaderRetrievalService jobMasterLeaderRetrievalService) {
+	private ResourceManager createAndStartResourceManager(
+			TestingLeaderElectionService resourceManagerLeaderElectionService,
+			JobID jobID,
+			TestingLeaderRetrievalService jobMasterLeaderRetrievalService,
+			FatalErrorHandler fatalErrorHandler) throws Exception {
 		TestingHighAvailabilityServices highAvailabilityServices = new TestingHighAvailabilityServices();
 		highAvailabilityServices.setResourceManagerLeaderElectionService(resourceManagerLeaderElectionService);
 		highAvailabilityServices.setJobMasterLeaderRetriever(jobID, jobMasterLeaderRetrievalService);
-		ResourceManager resourceManager = new TestingResourceManager(rpcService, highAvailabilityServices);
+
+		ResourceManagerConfiguration resourceManagerConfiguration = new ResourceManagerConfiguration(Time.seconds(5L), Time.seconds(5L));
+		SlotManagerFactory slotManagerFactory = new TestingSlotManagerFactory();
+		MetricRegistry metricRegistry = mock(MetricRegistry.class);
+
+		ResourceManager resourceManager = new StandaloneResourceManager(
+			rpcService,
+			resourceManagerConfiguration,
+			highAvailabilityServices,
+			slotManagerFactory,
+			metricRegistry,
+			fatalErrorHandler);
 		resourceManager.start();
 		return resourceManager;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/70af08c6/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
index a577c26..4640eab 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
@@ -18,15 +18,19 @@
 
 package org.apache.flink.runtime.resourcemanager;
 
+import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
 import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.TestingSerialRpcService;
 import org.apache.flink.runtime.registration.RegistrationResponse;
 import org.apache.flink.runtime.taskexecutor.SlotReport;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorRegistrationSuccess;
+import org.apache.flink.runtime.util.TestingFatalErrorHandler;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -34,7 +38,6 @@ import org.junit.Test;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 
-import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.mock;
@@ -53,13 +56,16 @@ public class ResourceManagerTaskExecutorTest {
 
 	private UUID leaderSessionId;
 
+	private TestingFatalErrorHandler testingFatalErrorHandler;
+
 	@Before
 	public void setup() throws Exception {
 		rpcService = new TestingSerialRpcService();
 
 		taskExecutorResourceID = mockTaskExecutor(taskExecutorAddress);
 		TestingLeaderElectionService rmLeaderElectionService = new TestingLeaderElectionService();
-		resourceManager = createAndStartResourceManager(rmLeaderElectionService);
+		testingFatalErrorHandler = new TestingFatalErrorHandler();
+		resourceManager = createAndStartResourceManager(rmLeaderElectionService, testingFatalErrorHandler);
 		leaderSessionId = grantLeadership(rmLeaderElectionService);
 	}
 
@@ -73,18 +79,24 @@ public class ResourceManagerTaskExecutorTest {
 	 */
 	@Test
 	public void testRegisterTaskExecutor() throws Exception {
-		// test response successful
-		Future<RegistrationResponse> successfulFuture =
-			resourceManager.registerTaskExecutor(leaderSessionId, taskExecutorAddress, taskExecutorResourceID, slotReport);
-		RegistrationResponse response = successfulFuture.get(5, TimeUnit.SECONDS);
-		assertTrue(response instanceof TaskExecutorRegistrationSuccess);
-
-		// test response successful with instanceID not equal to previous when receive duplicate registration from taskExecutor
-		Future<RegistrationResponse> duplicateFuture =
-			resourceManager.registerTaskExecutor(leaderSessionId, taskExecutorAddress, taskExecutorResourceID, slotReport);
-		RegistrationResponse duplicateResponse = duplicateFuture.get();
-		assertTrue(duplicateResponse instanceof TaskExecutorRegistrationSuccess);
-		assertNotEquals(((TaskExecutorRegistrationSuccess) response).getRegistrationId(), ((TaskExecutorRegistrationSuccess) duplicateResponse).getRegistrationId());
+		try {
+			// test response successful
+			Future<RegistrationResponse> successfulFuture =
+				resourceManager.registerTaskExecutor(leaderSessionId, taskExecutorAddress, taskExecutorResourceID, slotReport);
+			RegistrationResponse response = successfulFuture.get(5, TimeUnit.SECONDS);
+			assertTrue(response instanceof TaskExecutorRegistrationSuccess);
+
+			// test response successful with instanceID not equal to previous when receive duplicate registration from taskExecutor
+			Future<RegistrationResponse> duplicateFuture =
+				resourceManager.registerTaskExecutor(leaderSessionId, taskExecutorAddress, taskExecutorResourceID, slotReport);
+			RegistrationResponse duplicateResponse = duplicateFuture.get();
+			assertTrue(duplicateResponse instanceof TaskExecutorRegistrationSuccess);
+			assertNotEquals(((TaskExecutorRegistrationSuccess) response).getRegistrationId(), ((TaskExecutorRegistrationSuccess) duplicateResponse).getRegistrationId());
+		} finally {
+			if (testingFatalErrorHandler.hasExceptionOccurred()) {
+				testingFatalErrorHandler.rethrowError();
+			}
+		}
 	}
 
 	/**
@@ -92,11 +104,17 @@ public class ResourceManagerTaskExecutorTest {
 	 */
 	@Test
 	public void testRegisterTaskExecutorWithUnmatchedLeaderSessionId() throws Exception {
-		// test throw exception when receive a registration from taskExecutor which takes unmatched leaderSessionId
-		UUID differentLeaderSessionID = UUID.randomUUID();
-		Future<RegistrationResponse> unMatchedLeaderFuture =
-			resourceManager.registerTaskExecutor(differentLeaderSessionID, taskExecutorAddress, taskExecutorResourceID, slotReport);
-		assertTrue(unMatchedLeaderFuture.get(5, TimeUnit.SECONDS) instanceof RegistrationResponse.Decline);
+		try {
+			// test throw exception when receive a registration from taskExecutor which takes unmatched leaderSessionId
+			UUID differentLeaderSessionID = UUID.randomUUID();
+			Future<RegistrationResponse> unMatchedLeaderFuture =
+				resourceManager.registerTaskExecutor(differentLeaderSessionID, taskExecutorAddress, taskExecutorResourceID, slotReport);
+			assertTrue(unMatchedLeaderFuture.get(5, TimeUnit.SECONDS) instanceof RegistrationResponse.Decline);
+		} finally {
+			if (testingFatalErrorHandler.hasExceptionOccurred()) {
+				testingFatalErrorHandler.rethrowError();
+			}
+		}
 	}
 
 	/**
@@ -104,11 +122,17 @@ public class ResourceManagerTaskExecutorTest {
 	 */
 	@Test
 	public void testRegisterTaskExecutorFromInvalidAddress() throws Exception {
-		// test throw exception when receive a registration from taskExecutor which takes invalid address
-		String invalidAddress = "/taskExecutor2";
-		Future<RegistrationResponse> invalidAddressFuture =
-			resourceManager.registerTaskExecutor(leaderSessionId, invalidAddress, taskExecutorResourceID, slotReport);
-		assertTrue(invalidAddressFuture.get(5, TimeUnit.SECONDS) instanceof RegistrationResponse.Decline);
+		try {
+			// test throw exception when receive a registration from taskExecutor which takes invalid address
+			String invalidAddress = "/taskExecutor2";
+			Future<RegistrationResponse> invalidAddressFuture =
+				resourceManager.registerTaskExecutor(leaderSessionId, invalidAddress, taskExecutorResourceID, slotReport);
+			assertTrue(invalidAddressFuture.get(5, TimeUnit.SECONDS) instanceof RegistrationResponse.Decline);
+		} finally {
+			if (testingFatalErrorHandler.hasExceptionOccurred()) {
+				testingFatalErrorHandler.rethrowError();
+			}
+		}
 	}
 
 	private ResourceID mockTaskExecutor(String taskExecutorAddress) {
@@ -118,11 +142,22 @@ public class ResourceManagerTaskExecutorTest {
 		return taskExecutorResourceID;
 	}
 
-	private StandaloneResourceManager createAndStartResourceManager(TestingLeaderElectionService rmLeaderElectionService) {
+	private StandaloneResourceManager createAndStartResourceManager(TestingLeaderElectionService rmLeaderElectionService, FatalErrorHandler fatalErrorHandler) throws Exception {
 		TestingHighAvailabilityServices highAvailabilityServices = new TestingHighAvailabilityServices();
 		highAvailabilityServices.setResourceManagerLeaderElectionService(rmLeaderElectionService);
+		TestingSlotManagerFactory slotManagerFactory = new TestingSlotManagerFactory();
+		ResourceManagerConfiguration resourceManagerConfiguration = new ResourceManagerConfiguration(Time.seconds(5L), Time.seconds(5L));
+		MetricRegistry metricRegistry = mock(MetricRegistry.class);
+
+
 		StandaloneResourceManager resourceManager =
-			new TestingResourceManager(rpcService, highAvailabilityServices);
+			new StandaloneResourceManager(
+				rpcService,
+				resourceManagerConfiguration,
+				highAvailabilityServices,
+				slotManagerFactory,
+				metricRegistry,
+				fatalErrorHandler);
 		resourceManager.start();
 		return resourceManager;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/70af08c6/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingResourceManager.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingResourceManager.java
deleted file mode 100644
index 6b4ca14..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingResourceManager.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.runtime.resourcemanager;
-
-import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
-import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
-import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
-import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerFactory;
-import org.apache.flink.runtime.rpc.RpcService;
-
-public class TestingResourceManager extends StandaloneResourceManager {
-
-	public TestingResourceManager(RpcService rpcService) {
-		this(rpcService, new TestingHighAvailabilityServices());
-	}
-
-	public TestingResourceManager(
-			RpcService rpcService,
-			HighAvailabilityServices highAvailabilityServices) {
-		this(rpcService, highAvailabilityServices, new TestingSlotManagerFactory());
-	}
-
-	public TestingResourceManager(
-			RpcService rpcService,
-			HighAvailabilityServices highAvailabilityServices,
-			SlotManagerFactory slotManagerFactory) {
-		super(rpcService, highAvailabilityServices, slotManagerFactory);
-	}
-
-	private static class TestingSlotManagerFactory implements SlotManagerFactory {
-
-		@Override
-		public SlotManager create(ResourceManagerServices rmServices) {
-			return new TestingSlotManager(rmServices);
-		}
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/70af08c6/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingSlotManagerFactory.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingSlotManagerFactory.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingSlotManagerFactory.java
new file mode 100644
index 0000000..6b5f6b2
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingSlotManagerFactory.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager;
+
+import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
+import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerFactory;
+
+public class TestingSlotManagerFactory implements SlotManagerFactory {
+
+	@Override
+	public SlotManager create(ResourceManagerServices rmServices) {
+		return new TestingSlotManager(rmServices);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/70af08c6/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
index 86cd1f8..08ceb86 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
@@ -30,14 +30,18 @@ import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices
 import org.apache.flink.runtime.jobmaster.JobMasterGateway;
 import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
 import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
+import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.registration.RegistrationResponse;
+import org.apache.flink.runtime.resourcemanager.ResourceManager;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerServices;
 import org.apache.flink.runtime.resourcemanager.SlotRequest;
-import org.apache.flink.runtime.resourcemanager.TestingResourceManager;
+import org.apache.flink.runtime.resourcemanager.StandaloneResourceManager;
 import org.apache.flink.runtime.resourcemanager.TestingSlotManager;
 import org.apache.flink.runtime.resourcemanager.messages.jobmanager.RMSlotRequestReply;
 import org.apache.flink.runtime.resourcemanager.messages.taskexecutor.TMSlotRequestReply;
 import org.apache.flink.runtime.resourcemanager.registration.TaskExecutorRegistration;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.rpc.TestingSerialRpcService;
 import org.apache.flink.runtime.taskexecutor.SlotReport;
@@ -102,9 +106,17 @@ public class SlotProtocolTest extends TestLogger {
 		TestingLeaderElectionService rmLeaderElectionService =
 			configureHA(testingHaServices, jobID, rmAddress, rmLeaderID, jmAddress, jmLeaderID);
 
+		ResourceManagerConfiguration resourceManagerConfiguration = new ResourceManagerConfiguration(Time.seconds(5L), Time.seconds(5L));
+
 		final TestingSlotManagerFactory slotManagerFactory = new TestingSlotManagerFactory();
 		SpiedResourceManager resourceManager =
-			new SpiedResourceManager(testRpcService, testingHaServices, slotManagerFactory);
+			new SpiedResourceManager(
+				testRpcService,
+				resourceManagerConfiguration,
+				testingHaServices,
+				slotManagerFactory,
+				mock(MetricRegistry.class),
+				mock(FatalErrorHandler.class));
 		resourceManager.start();
 		rmLeaderElectionService.isLeader(rmLeaderID);
 
@@ -193,16 +205,26 @@ public class SlotProtocolTest extends TestLogger {
 			.thenReturn(new FlinkCompletableFuture<TMSlotRequestReply>());
 		testRpcService.registerGateway(tmAddress, taskExecutorGateway);
 
+		ResourceManagerConfiguration resourceManagerConfiguration = new ResourceManagerConfiguration(Time.seconds(5L), Time.seconds(5L));
+
 		TestingSlotManagerFactory slotManagerFactory = new TestingSlotManagerFactory();
-		TestingResourceManager resourceManager =
-			Mockito.spy(new TestingResourceManager(testRpcService, testingHaServices, slotManagerFactory));
+		ResourceManager<ResourceID> resourceManager =
+			Mockito.spy(new StandaloneResourceManager(
+				testRpcService,
+				resourceManagerConfiguration,
+				testingHaServices,
+				slotManagerFactory,
+				mock(MetricRegistry.class),
+				mock(FatalErrorHandler.class)));
 		resourceManager.start();
 		rmLeaderElectionService.isLeader(rmLeaderID);
 
+		Thread.sleep(1000);
+
 		Future<RegistrationResponse> registrationFuture =
 			resourceManager.registerJobMaster(rmLeaderID, jmLeaderID, jmAddress, jobID);
 		try {
-			registrationFuture.get(5, TimeUnit.SECONDS);
+			registrationFuture.get(5L, TimeUnit.SECONDS);
 		} catch (Exception e) {
 			Assert.fail("JobManager registration Future didn't become ready.");
 		}
@@ -258,15 +280,24 @@ public class SlotProtocolTest extends TestLogger {
 		return rmLeaderElectionService;
 	}
 
-	private static class SpiedResourceManager extends TestingResourceManager {
+	private static class SpiedResourceManager extends StandaloneResourceManager {
 
 		private int startNewWorkerCalled = 0;
 
 		public SpiedResourceManager(
 				RpcService rpcService,
+				ResourceManagerConfiguration resourceManagerConfiguration,
 				HighAvailabilityServices highAvailabilityServices,
-				SlotManagerFactory slotManagerFactory) {
-			super(rpcService, highAvailabilityServices, slotManagerFactory);
+				SlotManagerFactory slotManagerFactory,
+				MetricRegistry metricRegistry,
+				FatalErrorHandler fatalErrorHandler) {
+			super(
+				rpcService,
+				resourceManagerConfiguration,
+				highAvailabilityServices,
+				slotManagerFactory,
+				metricRegistry,
+				fatalErrorHandler);
 		}
 
 

http://git-wip-us.apache.org/repos/asf/flink/blob/70af08c6/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
index 638ec56..1ef7140 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
@@ -67,12 +67,12 @@ import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.TestingSerialRpcService;
 import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
 import org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable;
-import org.apache.flink.runtime.taskexecutor.exceptions.SlotAllocationException;
 import org.apache.flink.runtime.taskexecutor.slot.TimerService;
 import org.apache.flink.runtime.taskmanager.CheckpointResponder;
 import org.apache.flink.runtime.taskmanager.Task;
 import org.apache.flink.runtime.taskmanager.TaskManagerActions;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.runtime.util.TestingFatalErrorHandler;
 import org.apache.flink.util.SerializedValue;
 import org.apache.flink.util.TestLogger;
 
@@ -83,15 +83,12 @@ import org.junit.Test;
 import org.junit.rules.TestName;
 import org.mockito.Matchers;
 import org.powermock.api.mockito.PowerMockito;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.net.InetAddress;
 import java.net.URL;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.UUID;
-import java.util.concurrent.atomic.AtomicReference;
 
 import static org.junit.Assert.*;
 import static org.mockito.Mockito.*;
@@ -449,7 +446,7 @@ public class TaskExecutorTest extends TestLogger {
 					any(Time.class));
 		} finally {
 			// check if a concurrent error occurred
-			testingFatalErrorHandler.rethrowException();
+			testingFatalErrorHandler.rethrowError();
 
 			rpc.stopService();
 		}
@@ -556,63 +553,12 @@ public class TaskExecutorTest extends TestLogger {
 			assertTrue(taskSlotTable.isSlotFree(1));
 		} finally {
 			// check if a concurrent error occurred
-			testingFatalErrorHandler.rethrowException();
+			testingFatalErrorHandler.rethrowError();
 
 			rpc.stopService();
 		}
 	}
 
-	private static class TestingFatalErrorHandler implements FatalErrorHandler {
-		private static final Logger LOG = LoggerFactory.getLogger(TestingFatalErrorHandler.class);
-		private final AtomicReference<Throwable> atomicThrowable;
-
-		public TestingFatalErrorHandler() {
-			atomicThrowable = new AtomicReference<>(null);
-		}
-
-		public void rethrowException() throws TestingException {
-			Throwable throwable = atomicThrowable.get();
-
-			if (throwable != null) {
-				throw new TestingException(throwable);
-			}
-		}
-
-		public boolean hasExceptionOccurred() {
-			return atomicThrowable.get() != null;
-		}
-
-		public Throwable getException() {
-			return atomicThrowable.get();
-		}
-
-		@Override
-		public void onFatalError(Throwable exception) {
-			LOG.error("OnFatalError:", exception);
-			atomicThrowable.compareAndSet(null, exception);
-		}
-
-		//------------------------------------------------------------------
-		// static utility classes
-		//------------------------------------------------------------------
-
-		private static final class TestingException extends Exception {
-			public TestingException(String message) {
-				super(message);
-			}
-
-			public TestingException(String message, Throwable cause) {
-				super(message, cause);
-			}
-
-			public TestingException(Throwable cause) {
-				super(cause);
-			}
-
-			private static final long serialVersionUID = -4648195335470914498L;
-		}
-	}
-
 	/**
 	 * Tests that all allocation requests for slots are ignored if the slot has been reported as
 	 * free by the TaskExecutor but this report hasn't been confirmed by the ResourceManager.