You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/03/23 12:59:10 UTC

[2/2] flink git commit: [FLINK-4354] [heartbeat] Implement heartbeat logic between TaskManager and ResourceManager

[FLINK-4354] [heartbeat] Implement heartbeat logic between TaskManager and ResourceManager

This closes #3591.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fd90672f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fd90672f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fd90672f

Branch: refs/heads/master
Commit: fd90672f9ccf7a0e02e5eb9c6251dc3d451ce8ba
Parents: d20fb09
Author: Zhijiang <wa...@aliyun.com>
Authored: Wed Mar 22 15:12:33 2017 +0800
Committer: Till Rohrmann <tr...@apache.org>
Committed: Thu Mar 23 13:58:44 2017 +0100

----------------------------------------------------------------------
 .../heartbeat/TestingHeartbeatServices.java     |  52 ++++++++
 .../flink/runtime/jobmaster/JobMaster.java      |   4 +-
 .../flink/runtime/minicluster/MiniCluster.java  |   5 +-
 .../RegistrationConnectionListener.java         |  40 ++++++
 .../resourcemanager/ResourceManager.java        | 117 ++++++++++++++----
 .../resourcemanager/ResourceManagerGateway.java |  16 +++
 .../resourcemanager/ResourceManagerRunner.java  |   9 +-
 .../StandaloneResourceManager.java              |   5 +
 .../runtime/taskexecutor/TaskExecutor.java      | 106 ++++++++++++++--
 .../taskexecutor/TaskExecutorGateway.java       |  14 +++
 .../TaskExecutorRegistrationSuccess.java        |  16 ++-
 ...TaskExecutorToResourceManagerConnection.java |  21 +++-
 .../clusterframework/ResourceManagerTest.java   | 108 +++++++++++++++++
 .../flink/runtime/jobmaster/JobMasterTest.java  |  38 +-----
 .../resourcemanager/ResourceManagerHATest.java  |   7 ++
 .../ResourceManagerJobMasterTest.java           |   7 ++
 .../ResourceManagerTaskExecutorTest.java        |   7 ++
 .../slotmanager/SlotProtocolTest.java           |  17 +++
 .../taskexecutor/TaskExecutorITCase.java        |   3 +
 .../runtime/taskexecutor/TaskExecutorTest.java  | 121 +++++++++++++++++--
 .../yarn/YarnFlinkApplicationMasterRunner.java  |  21 ++--
 .../apache/flink/yarn/YarnResourceManager.java  |   8 +-
 22 files changed, 640 insertions(+), 102 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/fd90672f/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/TestingHeartbeatServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/TestingHeartbeatServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/TestingHeartbeatServices.java
new file mode 100644
index 0000000..e628db5
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/TestingHeartbeatServices.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.heartbeat;
+
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.concurrent.ScheduledExecutor;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+
+public class TestingHeartbeatServices extends HeartbeatServices {
+
+	private final ScheduledExecutor scheduledExecutorToUse;
+
+	public TestingHeartbeatServices(long heartbeatInterval, long heartbeatTimeout, ScheduledExecutor scheduledExecutorToUse) {
+		super(heartbeatInterval, heartbeatTimeout);
+
+		this.scheduledExecutorToUse = Preconditions.checkNotNull(scheduledExecutorToUse);
+	}
+
+	@Override
+	public <I, O> HeartbeatManager<I, O> createHeartbeatManagerSender(
+		ResourceID resourceId,
+		HeartbeatListener<I, O> heartbeatListener,
+		ScheduledExecutor scheduledExecutor,
+		Logger log) {
+
+		return new HeartbeatManagerSenderImpl<>(
+			heartbeatInterval,
+			heartbeatTimeout,
+			resourceId,
+			heartbeatListener,
+			org.apache.flink.runtime.concurrent.Executors.directExecutor(),
+			scheduledExecutorToUse,
+			log);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/fd90672f/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index 243b57f..81fc541 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -1043,11 +1043,11 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 
 		@Override
 		public void notifyHeartbeatTimeout(ResourceID resourceID) {
-			log.info("Task manager with id {} timed out.", resourceID);
+			log.info("Task manager with id {} heartbeat timed out.", resourceID);
 
 			getSelf().disconnectTaskManager(
 				resourceID,
-				new TimeoutException("The heartbeat of TaskManager with id " + resourceID + " timed out."));
+				new TimeoutException("Task manager with id " + resourceID + " heartbeat timed out."));
 		}
 
 		@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/fd90672f/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
index 25c4aba..2cfba7b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
@@ -241,7 +241,7 @@ public class MiniCluster {
 				// bring up the ResourceManager(s)
 				LOG.info("Starting {} ResourceManger(s)", numResourceManagers);
 				resourceManagerRunners = startResourceManagers(
-						configuration, haServices, metricRegistry, numResourceManagers, resourceManagerRpcServices);
+						configuration, haServices, heartbeatServices, metricRegistry, numResourceManagers, resourceManagerRpcServices);
 
 				// bring up the TaskManager(s) for the mini cluster
 				LOG.info("Starting {} TaskManger(s)", numTaskManagers);
@@ -508,6 +508,7 @@ public class MiniCluster {
 	protected ResourceManagerRunner[] startResourceManagers(
 			Configuration configuration,
 			HighAvailabilityServices haServices,
+			HeartbeatServices heartbeatServices,
 			MetricRegistry metricRegistry,
 			int numResourceManagers,
 			RpcService[] resourceManagerRpcServices) throws Exception {
@@ -517,9 +518,11 @@ public class MiniCluster {
 		for (int i = 0; i < numResourceManagers; i++) {
 
 			resourceManagerRunners[i] = new ResourceManagerRunner(
+				ResourceID.generate(),
 				configuration,
 				resourceManagerRpcServices[i],
 				haServices,
+				heartbeatServices,
 				metricRegistry);
 
 			resourceManagerRunners[i].start();

http://git-wip-us.apache.org/repos/asf/flink/blob/fd90672f/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RegistrationConnectionListener.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RegistrationConnectionListener.java b/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RegistrationConnectionListener.java
new file mode 100644
index 0000000..360f982
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RegistrationConnectionListener.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.registration;
+
+/**
+ * Classes which want to be notified about the registration result by the {@link RegisteredRpcConnection}
+ * have to implement this interface.
+ */
+public interface RegistrationConnectionListener<Success extends RegistrationResponse.Success> {
+
+	/**
+	 * This method is called by the {@link RegisteredRpcConnection} when the registration is success.
+	 *
+	 * @param success The concrete response information for successful registration.
+	 */
+	void onRegistrationSuccess(Success success);
+
+	/**
+	 * This method is called by the {@link RegisteredRpcConnection} when the registration fails.
+	 *
+	 * @param failure The exception which causes the registration failure.
+	 */
+	void onRegistrationFailure(Throwable failure);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/fd90672f/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index 1430a49..9a7a790 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -30,6 +30,10 @@ import org.apache.flink.runtime.concurrent.ApplyFunction;
 import org.apache.flink.runtime.concurrent.BiFunction;
 import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
+import org.apache.flink.runtime.heartbeat.HeartbeatListener;
+import org.apache.flink.runtime.heartbeat.HeartbeatManager;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import org.apache.flink.runtime.heartbeat.HeartbeatTarget;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.highavailability.LeaderIdMismatchException;
 import org.apache.flink.runtime.instance.InstanceID;
@@ -64,6 +68,7 @@ import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.Executor;
+import java.util.concurrent.TimeoutException;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -81,6 +86,9 @@ public abstract class ResourceManager<WorkerType extends Serializable>
 		extends RpcEndpoint<ResourceManagerGateway>
 		implements LeaderContender {
 
+	/** Unique id of the resource manager */
+	private final ResourceID resourceId;
+
 	/** Configuration of the resource manager */
 	private final ResourceManagerConfiguration resourceManagerConfiguration;
 
@@ -96,6 +104,9 @@ public abstract class ResourceManager<WorkerType extends Serializable>
 	/** High availability services for leader retrieval and election. */
 	private final HighAvailabilityServices highAvailabilityServices;
 
+	/** The heartbeat manager with task managers. */
+	private final HeartbeatManager<Void, Void> taskManagerHeartbeatManager;
+
 	/** The factory to construct the SlotManager. */
 	private final SlotManagerFactory slotManagerFactory;
 
@@ -118,9 +129,11 @@ public abstract class ResourceManager<WorkerType extends Serializable>
 	private ConcurrentMap<String, InfoMessageListenerRpcGateway> infoMessageListeners;
 
 	public ResourceManager(
+			ResourceID resourceId,
 			RpcService rpcService,
 			ResourceManagerConfiguration resourceManagerConfiguration,
 			HighAvailabilityServices highAvailabilityServices,
+			HeartbeatServices heartbeatServices,
 			SlotManagerFactory slotManagerFactory,
 			MetricRegistry metricRegistry,
 			JobLeaderIdService jobLeaderIdService,
@@ -128,6 +141,7 @@ public abstract class ResourceManager<WorkerType extends Serializable>
 
 		super(rpcService);
 
+		this.resourceId = checkNotNull(resourceId);
 		this.resourceManagerConfiguration = checkNotNull(resourceManagerConfiguration);
 		this.highAvailabilityServices = checkNotNull(highAvailabilityServices);
 		this.slotManagerFactory = checkNotNull(slotManagerFactory);
@@ -135,6 +149,12 @@ public abstract class ResourceManager<WorkerType extends Serializable>
 		this.jobLeaderIdService = checkNotNull(jobLeaderIdService);
 		this.fatalErrorHandler = checkNotNull(fatalErrorHandler);
 
+		this.taskManagerHeartbeatManager = heartbeatServices.createHeartbeatManagerSender(
+				resourceId,
+				new TaskManagerHeartbeatListener(),
+				rpcService.getScheduledExecutor(),
+				log);
+
 		this.jobManagerRegistrations = new HashMap<>(4);
 		this.taskExecutors = new HashMap<>(8);
 		this.leaderSessionId = null;
@@ -178,6 +198,8 @@ public abstract class ResourceManager<WorkerType extends Serializable>
 	public void shutDown() throws Exception {
 		Exception exception = null;
 
+		taskManagerHeartbeatManager.stop();
+
 		try {
 			super.shutDown();
 		} catch (Exception e) {
@@ -326,7 +348,7 @@ public abstract class ResourceManager<WorkerType extends Serializable>
 	 *
 	 * @param resourceManagerLeaderId  The fencing token for the ResourceManager leader
 	 * @param taskExecutorAddress      The address of the TaskExecutor that registers
-	 * @param resourceID               The resource ID of the TaskExecutor that registers
+	 * @param taskExecutorResourceId  The resource ID of the TaskExecutor that registers
 	 *
 	 * @return The response by the ResourceManager.
 	 */
@@ -334,7 +356,7 @@ public abstract class ResourceManager<WorkerType extends Serializable>
 	public Future<RegistrationResponse> registerTaskExecutor(
 		final UUID resourceManagerLeaderId,
 		final String taskExecutorAddress,
-		final ResourceID resourceID,
+		final ResourceID taskExecutorResourceId,
 		final SlotReport slotReport) {
 
 		if (leaderSessionId.equals(resourceManagerLeaderId)) {
@@ -342,25 +364,37 @@ public abstract class ResourceManager<WorkerType extends Serializable>
 
 			return taskExecutorGatewayFuture.handleAsync(new BiFunction<TaskExecutorGateway, Throwable, RegistrationResponse>() {
 				@Override
-				public RegistrationResponse apply(TaskExecutorGateway taskExecutorGateway, Throwable throwable) {
+				public RegistrationResponse apply(final TaskExecutorGateway taskExecutorGateway, Throwable throwable) {
 					if (throwable != null) {
 						return new RegistrationResponse.Decline(throwable.getMessage());
 					} else {
-						WorkerRegistration<WorkerType> oldRegistration = taskExecutors.remove(resourceID);
+						WorkerRegistration<WorkerType> oldRegistration = taskExecutors.remove(taskExecutorResourceId);
 						if (oldRegistration != null) {
 							// TODO :: suggest old taskExecutor to stop itself
-							log.info("Replacing old instance of worker for ResourceID {}", resourceID);
+							log.info("Replacing old instance of worker for ResourceID {}", taskExecutorResourceId);
 						}
 
-						WorkerType newWorker = workerStarted(resourceID);
+						WorkerType newWorker = workerStarted(taskExecutorResourceId);
 						WorkerRegistration<WorkerType> registration =
 							new WorkerRegistration<>(taskExecutorGateway, newWorker);
 
-						taskExecutors.put(resourceID, registration);
-						slotManager.registerTaskExecutor(resourceID, registration, slotReport);
+						taskExecutors.put(taskExecutorResourceId, registration);
+						slotManager.registerTaskExecutor(taskExecutorResourceId, registration, slotReport);
+
+						taskManagerHeartbeatManager.monitorTarget(taskExecutorResourceId, new HeartbeatTarget<Void>() {
+							@Override
+							public void receiveHeartbeat(ResourceID resourceID, Void payload) {
+								// the task manager will not request heartbeat, so this method will never be called currently
+							}
+
+							@Override
+							public void requestHeartbeat(ResourceID resourceID, Void payload) {
+								taskExecutorGateway.heartbeatFromResourceManager(resourceID);
+							}
+						});
 
 						return new TaskExecutorRegistrationSuccess(
-							registration.getInstanceID(),
+							registration.getInstanceID(), resourceId,
 							resourceManagerConfiguration.getHeartbeatInterval().toMilliseconds());
 					}
 				}
@@ -368,7 +402,7 @@ public abstract class ResourceManager<WorkerType extends Serializable>
 		} else {
 			log.warn("Discard registration from TaskExecutor {} at ({}) because the expected leader session ID {} did " +
 					"not equal the received leader session ID  {}",
-				resourceID, taskExecutorAddress, leaderSessionId, resourceManagerLeaderId);
+				taskExecutorResourceId, taskExecutorAddress, leaderSessionId, resourceManagerLeaderId);
 
 			return FlinkCompletableFuture.<RegistrationResponse>completed(
 				new RegistrationResponse.Decline("Discard registration because the leader id " +
@@ -377,6 +411,16 @@ public abstract class ResourceManager<WorkerType extends Serializable>
 		}
 	}
 
+	@RpcMethod
+	public void heartbeatFromTaskManager(final ResourceID resourceID) {
+		taskManagerHeartbeatManager.receiveHeartbeat(resourceID, null);
+	}
+
+	@RpcMethod
+	public void disconnectTaskManager(final ResourceID resourceId, final Exception cause) {
+		closeTaskManagerConnection(resourceId, cause);
+	}
+
 	/**
 	 * Requests a slot from the resource manager.
 	 *
@@ -716,24 +760,24 @@ public abstract class ResourceManager<WorkerType extends Serializable>
 	 * This method should be called by the framework once it detects that a currently registered
 	 * task executor has failed.
 	 *
-	 * @param resourceID Id of the worker that has failed.
-	 * @param message An informational message that explains why the worker failed.
+	 * @param resourceID Id of the TaskManager that has failed.
+	 * @param cause The exception which cause the TaskManager failed.
 	 */
-	public void notifyWorkerFailed(final ResourceID resourceID, final String message) {
-		runAsync(new Runnable() {
-			@Override
-			public void run() {
-				WorkerRegistration<WorkerType> workerRegistration = taskExecutors.remove(resourceID);
+	public void closeTaskManagerConnection(final ResourceID resourceID, final Exception cause) {
+		taskManagerHeartbeatManager.unmonitorTarget(resourceID);
 
-				if (workerRegistration != null) {
-					log.info("Task manager {} failed because {}.", resourceID, message);
-					// TODO :: suggest failed task executor to stop itself
-					slotManager.notifyTaskManagerFailure(resourceID);
-				} else {
-					log.debug("Could not find a registered task manager with the process id {}.", resourceID);
-				}
-			}
-		});
+		WorkerRegistration<WorkerType> workerRegistration = taskExecutors.remove(resourceID);
+
+		if (workerRegistration != null) {
+			log.info("Task manager {} failed because {}.", resourceID, cause);
+
+			// TODO :: suggest failed task executor to stop itself
+			slotManager.notifyTaskManagerFailure(resourceID);
+
+			workerRegistration.getTaskExecutorGateway().disconnectResourceManager(cause);
+		} else {
+			log.debug("Could not find a registered task manager with the process id {}.", resourceID);
+		}
 	}
 
 	// ------------------------------------------------------------------------
@@ -827,5 +871,26 @@ public abstract class ResourceManager<WorkerType extends Serializable>
 			onFatalErrorAsync(error);
 		}
 	}
+
+	private class TaskManagerHeartbeatListener implements HeartbeatListener<Void, Void> {
+
+		@Override
+		public void notifyHeartbeatTimeout(ResourceID resourceID) {
+			log.info("The heartbeat of TaskManager with id {} timed out.", resourceID);
+
+			closeTaskManagerConnection(resourceID, new TimeoutException(
+					"Task manager with id " + resourceID + " heartbeat timed out."));
+		}
+
+		@Override
+		public void reportPayload(ResourceID resourceID, Void payload) {
+			// nothing to do since there is no payload
+		}
+
+		@Override
+		public Future<Void> retrievePayload() {
+			return FlinkCompletableFuture.completed(null);
+		}
+	}
 }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/fd90672f/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
index 8235ea7..7741e0d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
@@ -130,4 +130,20 @@ public interface ResourceManagerGateway extends RpcGateway {
 	 * @return The future to the number of registered TaskManagers.
 	 */
 	Future<Integer> getNumberOfRegisteredTaskManagers(UUID leaderSessionId);
+
+	/**
+	 * Sends the heartbeat to resource manager from task manager
+	 *
+	 * @param resourceID unique id of the task manager
+	 */
+	void heartbeatFromTaskManager(final ResourceID resourceID);
+
+	/**
+	 * Disconnects the given {@link org.apache.flink.runtime.taskexecutor.TaskExecutor} from the
+	 * {@link ResourceManager}.
+	 *
+	 * @param resourceID identifying the TaskManager to disconnect
+	 * @param cause for the disconnection of the TaskManager
+	 */
+	void disconnectTaskManager(ResourceID resourceID, Exception cause);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/fd90672f/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java
index 73b27b5..d07e373 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java
@@ -19,6 +19,8 @@
 package org.apache.flink.runtime.resourcemanager;
 
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
@@ -43,14 +45,18 @@ public class ResourceManagerRunner implements FatalErrorHandler {
 	private final ResourceManager<?> resourceManager;
 
 	public ResourceManagerRunner(
+			final ResourceID resourceId,
 			final Configuration configuration,
 			final RpcService rpcService,
 			final HighAvailabilityServices highAvailabilityServices,
+			final HeartbeatServices heartbeatServices,
 			final MetricRegistry metricRegistry) throws Exception {
 
+		Preconditions.checkNotNull(resourceId);
 		Preconditions.checkNotNull(configuration);
 		Preconditions.checkNotNull(rpcService);
 		Preconditions.checkNotNull(highAvailabilityServices);
+		Preconditions.checkNotNull(heartbeatServices);
 		Preconditions.checkNotNull(metricRegistry);
 
 		final ResourceManagerConfiguration resourceManagerConfiguration = ResourceManagerConfiguration.fromConfiguration(configuration);
@@ -63,9 +69,11 @@ public class ResourceManagerRunner implements FatalErrorHandler {
 			rpcService.getScheduledExecutor());
 
 		this.resourceManager = new StandaloneResourceManager(
+			resourceId,
 			rpcService,
 			resourceManagerConfiguration,
 			highAvailabilityServices,
+			heartbeatServices,
 			resourceManagerRuntimeServices.getSlotManagerFactory(),
 			metricRegistry,
 			resourceManagerRuntimeServices.getJobLeaderIdService(),
@@ -87,7 +95,6 @@ public class ResourceManagerRunner implements FatalErrorHandler {
 	private void shutDownInternally() throws Exception {
 		Exception exception = null;
 		synchronized (lock) {
-
 			try {
 				resourceManager.shutDown();
 			} catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/flink/blob/fd90672f/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java
index 73c8a2d..e2d6538 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.resourcemanager;
 import org.apache.flink.runtime.clusterframework.ApplicationStatus;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException;
@@ -37,17 +38,21 @@ import org.apache.flink.runtime.rpc.RpcService;
 public class StandaloneResourceManager extends ResourceManager<ResourceID> {
 
 	public StandaloneResourceManager(
+			ResourceID resourceId,
 			RpcService rpcService,
 			ResourceManagerConfiguration resourceManagerConfiguration,
 			HighAvailabilityServices highAvailabilityServices,
+			HeartbeatServices heartbeatServices,
 			SlotManagerFactory slotManagerFactory,
 			MetricRegistry metricRegistry,
 			JobLeaderIdService jobLeaderIdService,
 			FatalErrorHandler fatalErrorHandler) {
 		super(
+			resourceId,
 			rpcService,
 			resourceManagerConfiguration,
 			highAvailabilityServices,
+			heartbeatServices,
 			slotManagerFactory,
 			metricRegistry,
 			jobLeaderIdService,

http://git-wip-us.apache.org/repos/asf/flink/blob/fd90672f/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index 83c225f..f3e1ff3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -57,6 +57,7 @@ import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
 import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.registration.RegistrationConnectionListener;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
 import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
 import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
@@ -135,6 +136,9 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
 	/** The heartbeat manager for job manager in the task manager */
 	private final HeartbeatManager<Void, Void> jobManagerHeartbeatManager;
 
+	/** The heartbeat manager for resource manager in the task manager */
+	private final HeartbeatManager<Void, Void> resourceManagerHeartbeatManager;
+
 	/** The fatal error handler to use in case of a fatal error */
 	private final FatalErrorHandler fatalErrorHandler;
 
@@ -206,6 +210,12 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
 			new JobManagerHeartbeatListener(),
 			rpcService.getScheduledExecutor(),
 			log);
+
+		this.resourceManagerHeartbeatManager = heartbeatServices.createHeartbeatManager(
+				getResourceID(),
+				new ResourceManagerHeartbeatListener(),
+				rpcService.getScheduledExecutor(),
+				log);
 	}
 
 	// ------------------------------------------------------------------------
@@ -247,6 +257,8 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
 
 		jobManagerHeartbeatManager.stop();
 
+		resourceManagerHeartbeatManager.stop();
+
 		ioManager.shutdown();
 
 		memoryManager.shutdown();
@@ -497,6 +509,11 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
 		jobManagerHeartbeatManager.requestHeartbeat(resourceID, null);
 	}
 
+	@RpcMethod
+	public void heartbeatFromResourceManager(ResourceID resourceID) {
+		resourceManagerHeartbeatManager.requestHeartbeat(resourceID, null);
+	}
+
 	// ----------------------------------------------------------------------
 	// Checkpointing RPCs
 	// ----------------------------------------------------------------------
@@ -619,11 +636,20 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
 		return new TMSlotRequestRegistered(resourceManagerConnection.getRegistrationId(), getResourceID(), allocationId);
 	}
 
+	// ----------------------------------------------------------------------
+	// Disconnection RPCs
+	// ----------------------------------------------------------------------
+
 	@RpcMethod
 	public void disconnectJobManager(JobID jobId, Exception cause) {
 		closeJobManagerConnection(jobId, cause);
 	}
 
+	@RpcMethod
+	public void disconnectResourceManager(Exception cause) {
+		closeResourceManagerConnection(cause);
+	}
+
 	// ======================================================================
 	//  Internal methods
 	// ======================================================================
@@ -665,11 +691,25 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
 					newLeaderAddress,
 					newLeaderId,
 					getMainThreadExecutor(),
-					new ForwardingFatalErrorHandler());
+					new ResourceManagerRegistrationListener());
 			resourceManagerConnection.start();
 		}
 	}
 
+	private void closeResourceManagerConnection(Exception cause) {
+		log.info("Close ResourceManager connection for {}.", cause);
+
+		if (isConnectedToResourceManager()) {
+			resourceManagerHeartbeatManager.unmonitorTarget(resourceManagerConnection.getResourceManagerId());
+
+			ResourceManagerGateway resourceManagerGateway = resourceManagerConnection.getTargetGateway();
+			resourceManagerConnection.close();
+			resourceManagerConnection = null;
+
+			resourceManagerGateway.disconnectTaskManager(getResourceID(), cause);
+		}
+	}
+
 	// ------------------------------------------------------------------------
 	//  Internal job manager connection methods
 	// ------------------------------------------------------------------------
@@ -747,10 +787,10 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
 							offerSlotsToJobManager(jobId);
 						} else {
 							log.warn("Slot offering to JobManager failed. Freeing the slots " +
-								"and returning them to the ResourceManager.", throwable);
+									"and returning them to the ResourceManager.", throwable);
 
 							// We encountered an exception. Free the slots and return them to the RM.
-							for (SlotOffer reservedSlot: reservedSlots) {
+							for (SlotOffer reservedSlot : reservedSlots) {
 								freeSlot(reservedSlot.getAllocationId(), throwable);
 							}
 						}
@@ -1137,11 +1177,32 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
 		}
 	}
 
-	private final class ForwardingFatalErrorHandler implements FatalErrorHandler {
+	private final class ResourceManagerRegistrationListener implements RegistrationConnectionListener<TaskExecutorRegistrationSuccess> {
 
 		@Override
-		public void onFatalError(Throwable exception) {
-			onFatalErrorAsync(exception);
+		public void onRegistrationSuccess(TaskExecutorRegistrationSuccess success) {
+			final ResourceID resourceManagerId = success.getResourceManagerId();
+
+			// monitor the resource manager as heartbeat target
+			resourceManagerHeartbeatManager.monitorTarget(resourceManagerId, new HeartbeatTarget<Void>() {
+				@Override
+				public void receiveHeartbeat(ResourceID resourceID, Void payload) {
+					if (isConnectedToResourceManager()) {
+						ResourceManagerGateway resourceManagerGateway = resourceManagerConnection.getTargetGateway();
+						resourceManagerGateway.heartbeatFromTaskManager(resourceID);
+					}
+				}
+
+				@Override
+				public void requestHeartbeat(ResourceID resourceID, Void payload) {
+					// request heartbeat will never be called on the task manager side
+				}
+			});
+		}
+
+		@Override
+		public void onRegistrationFailure(Throwable failure) {
+			onFatalErrorAsync(failure);
 		}
 	}
 
@@ -1216,15 +1277,14 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
 			runAsync(new Runnable() {
 				@Override
 				public void run() {
-					log.info("The JobManager connection {} has timed out.", resourceID);
+					log.info("Job manager with id {} heartbeat timed out.", resourceID);
 
 					if (jobManagerConnections.containsKey(resourceID)) {
 						JobManagerConnection jobManagerConnection = jobManagerConnections.get(resourceID);
 						if (jobManagerConnection != null) {
 							closeJobManagerConnection(
 								jobManagerConnection.getJobID(),
-								new TimeoutException("The heartbeat of JobManager with id " +
-									resourceID + " timed out."));
+								new TimeoutException("Job manager with id " + resourceID + " heartbeat timed out."));
 						}
 					}
 				}
@@ -1241,4 +1301,32 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
 			return FlinkCompletableFuture.completed(null);
 		}
 	}
+
+	private class ResourceManagerHeartbeatListener implements HeartbeatListener<Void, Void> {
+
+		@Override
+		public void notifyHeartbeatTimeout(final ResourceID resourceID) {
+			runAsync(new Runnable() {
+				@Override
+				public void run() {
+					log.info("Resource manager with id {} heartbeat timed out.", resourceID);
+
+					if (isConnectedToResourceManager() && resourceManagerConnection.getResourceManagerId().equals(resourceID)) {
+						closeResourceManagerConnection(
+								new TimeoutException("Resource manager with id " + resourceID + " heartbeat timed out."));
+					}
+				}
+			});
+		}
+
+		@Override
+		public void reportPayload(ResourceID resourceID, Void payload) {
+			// nothing to do since the payload is of type Void
+		}
+
+		@Override
+		public Future<Void> retrievePayload() {
+			return FlinkCompletableFuture.completed(null);
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/fd90672f/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
index 2dcc3a4..2bbf0e6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
@@ -141,10 +141,24 @@ public interface TaskExecutorGateway extends RpcGateway {
 	void heartbeatFromJobManager(ResourceID heartbeatOrigin);
 
 	/**
+	 * Heartbeat request from the resource manager
+	 *
+	 * @param heartbeatOrigin unique id of the resource manager
+	 */
+	void heartbeatFromResourceManager(ResourceID heartbeatOrigin);
+
+	/**
 	 * Disconnects the given JobManager from the TaskManager.
 	 *
 	 * @param jobId JobID for which the JobManager was the leader
 	 * @param cause for the disconnection from the JobManager
 	 */
 	void disconnectJobManager(JobID jobId, Exception cause);
+
+	/**
+	 * Disconnects the ResourceManager from the TaskManager.
+	 *
+	 * @param cause for the disconnection from the ResourceManager
+	 */
+	void disconnectResourceManager(Exception cause);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/fd90672f/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorRegistrationSuccess.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorRegistrationSuccess.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorRegistrationSuccess.java
index b357f52..4b61f68 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorRegistrationSuccess.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorRegistrationSuccess.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.taskexecutor;
 
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.registration.RegistrationResponse;
 
@@ -33,16 +34,20 @@ public final class TaskExecutorRegistrationSuccess extends RegistrationResponse.
 
 	private final InstanceID registrationId;
 
+	private final ResourceID resourceManagerResourceId;
+
 	private final long heartbeatInterval;
 
 	/**
 	 * Create a new {@code TaskExecutorRegistrationSuccess} message.
 	 * 
 	 * @param registrationId     The ID that the ResourceManager assigned the registration.
+	 * @param resourceManagerResourceId The unique ID that identifies the ResourceManager.
 	 * @param heartbeatInterval  The interval in which the ResourceManager will heartbeat the TaskExecutor.
 	 */
-	public TaskExecutorRegistrationSuccess(InstanceID registrationId, long heartbeatInterval) {
+	public TaskExecutorRegistrationSuccess(InstanceID registrationId, ResourceID resourceManagerResourceId, long heartbeatInterval) {
 		this.registrationId = registrationId;
+		this.resourceManagerResourceId = resourceManagerResourceId;
 		this.heartbeatInterval = heartbeatInterval;
 	}
 
@@ -54,6 +59,13 @@ public final class TaskExecutorRegistrationSuccess extends RegistrationResponse.
 	}
 
 	/**
+	 * Gets the unique ID that identifies the ResourceManager.
+	 */
+	public ResourceID getResourceManagerId() {
+		return resourceManagerResourceId;
+	}
+
+	/**
 	 * Gets the interval in which the ResourceManager will heartbeat the TaskExecutor.
 	 */
 	public long getHeartbeatInterval() {
@@ -62,7 +74,7 @@ public final class TaskExecutorRegistrationSuccess extends RegistrationResponse.
 
 	@Override
 	public String toString() {
-		return "TaskExecutorRegistrationSuccess (" + registrationId + " / " + heartbeatInterval + ')';
+		return "TaskExecutorRegistrationSuccess (" + registrationId + " / " + resourceManagerResourceId + " / " + heartbeatInterval + ')';
 	}
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/fd90672f/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnection.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnection.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnection.java
index 6e3e39b..775482c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnection.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnection.java
@@ -22,7 +22,7 @@ import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.registration.RegisteredRpcConnection;
-import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.registration.RegistrationConnectionListener;
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.registration.RegistrationResponse;
 import org.apache.flink.runtime.registration.RetryingRegistration;
@@ -51,10 +51,12 @@ public class TaskExecutorToResourceManagerConnection
 
 	private final SlotReport slotReport;
 
-	private final FatalErrorHandler fatalErrorHandler;
+	private final RegistrationConnectionListener<TaskExecutorRegistrationSuccess> registrationListener;
 
 	private InstanceID registrationId;
 
+	private ResourceID resourceManagerResourceId;
+
 	public TaskExecutorToResourceManagerConnection(
 			Logger log,
 			RpcService rpcService,
@@ -64,7 +66,7 @@ public class TaskExecutorToResourceManagerConnection
 			String resourceManagerAddress,
 			UUID resourceManagerLeaderId,
 			Executor executor,
-			FatalErrorHandler fatalErrorHandler) {
+			RegistrationConnectionListener<TaskExecutorRegistrationSuccess> registrationListener) {
 
 		super(log, resourceManagerAddress, resourceManagerLeaderId, executor);
 
@@ -72,7 +74,7 @@ public class TaskExecutorToResourceManagerConnection
 		this.taskManagerAddress = Preconditions.checkNotNull(taskManagerAddress);
 		this.taskManagerResourceId = Preconditions.checkNotNull(taskManagerResourceId);
 		this.slotReport = Preconditions.checkNotNull(slotReport);
-		this.fatalErrorHandler = Preconditions.checkNotNull(fatalErrorHandler);
+		this.registrationListener = Preconditions.checkNotNull(registrationListener);
 	}
 
 
@@ -94,13 +96,15 @@ public class TaskExecutorToResourceManagerConnection
 			getTargetAddress(), success.getRegistrationId());
 
 		registrationId = success.getRegistrationId();
+		resourceManagerResourceId = success.getResourceManagerId();
+		registrationListener.onRegistrationSuccess(success);
 	}
 
 	@Override
 	protected void onRegistrationFailure(Throwable failure) {
 		log.info("Failed to register at resource manager {}.", getTargetAddress(), failure);
 
-		fatalErrorHandler.onFatalError(failure);
+		registrationListener.onRegistrationFailure(failure);
 	}
 
 	/**
@@ -111,6 +115,13 @@ public class TaskExecutorToResourceManagerConnection
 		return registrationId;
 	}
 
+	/**
+	 * Gets the unique id of ResourceManager, that is returned when registration success.
+	 */
+	public ResourceID getResourceManagerId() {
+		return resourceManagerResourceId;
+	}
+
 	// ------------------------------------------------------------------------
 	//  Utilities
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/fd90672f/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java
index ca8a07a..e7f2439 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.clusterframework;
 
 import akka.actor.ActorSystem;
 import akka.testkit.JavaTestKit;
+import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
@@ -30,21 +31,47 @@ import org.apache.flink.runtime.clusterframework.messages.RemoveResource;
 import org.apache.flink.runtime.clusterframework.messages.ResourceRemoved;
 import org.apache.flink.runtime.clusterframework.messages.TriggerRegistrationAtJobManager;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.concurrent.ScheduledExecutor;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import org.apache.flink.runtime.heartbeat.TestingHeartbeatServices;
+import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
 import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.registration.RegistrationResponse;
+import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
+import org.apache.flink.runtime.resourcemanager.StandaloneResourceManager;
+import org.apache.flink.runtime.resourcemanager.TestingSlotManagerFactory;
+import org.apache.flink.runtime.rpc.TestingSerialRpcService;
+import org.apache.flink.runtime.taskexecutor.SlotReport;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorRegistrationSuccess;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.testutils.TestingResourceManager;
+import org.apache.flink.runtime.util.TestingFatalErrorHandler;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
+import org.mockito.ArgumentCaptor;
 import scala.Option;
 
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 import static org.junit.Assert.*;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
 
 /**
  * General tests for the resource manager component.
@@ -335,4 +362,85 @@ public class ResourceManagerTest {
 		}};
 		}};
 	}
+
+	@Test
+	public void testHeartbeatTimeoutWithTaskExecutor() throws Exception {
+		final String taskManagerAddress = "tm";
+		final ResourceID taskManagerResourceID = new ResourceID(taskManagerAddress);
+		final ResourceID resourceManagerResourceID = ResourceID.generate();
+		final TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class);
+
+		final TestingSerialRpcService rpcService = new TestingSerialRpcService();
+		rpcService.registerGateway(taskManagerAddress, taskExecutorGateway);
+
+		final ResourceManagerConfiguration resourceManagerConfiguration = new ResourceManagerConfiguration(
+			Time.seconds(5L),
+			Time.seconds(5L));
+
+		final TestingLeaderElectionService rmLeaderElectionService = new TestingLeaderElectionService();
+		final TestingHighAvailabilityServices highAvailabilityServices = new TestingHighAvailabilityServices();
+		highAvailabilityServices.setResourceManagerLeaderElectionService(rmLeaderElectionService);
+
+		final long heartbeatInterval = 1L;
+		final long heartbeatTimeout = 5L;
+		final ScheduledExecutor scheduledExecutor = mock(ScheduledExecutor.class);
+		final HeartbeatServices heartbeatServices = new TestingHeartbeatServices(heartbeatInterval, heartbeatTimeout, scheduledExecutor);
+
+		final TestingSlotManagerFactory slotManagerFactory = new TestingSlotManagerFactory();
+		final MetricRegistry metricRegistry = mock(MetricRegistry.class);
+		final JobLeaderIdService jobLeaderIdService = mock(JobLeaderIdService.class);
+		final TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler();
+
+		try {
+			final StandaloneResourceManager resourceManager = new StandaloneResourceManager(
+				resourceManagerResourceID,
+				rpcService,
+				resourceManagerConfiguration,
+				highAvailabilityServices,
+				heartbeatServices,
+				slotManagerFactory,
+				metricRegistry,
+				jobLeaderIdService,
+				testingFatalErrorHandler);
+
+			resourceManager.start();
+
+			final UUID rmLeaderSessionId = UUID.randomUUID();
+			rmLeaderElectionService.isLeader(rmLeaderSessionId);
+
+			final SlotReport slotReport = new SlotReport();
+			// test registration response successful and it will trigger monitor heartbeat target, schedule heartbeat request at interval time
+			Future<RegistrationResponse> successfulFuture =
+					resourceManager.registerTaskExecutor(rmLeaderSessionId, taskManagerAddress, taskManagerResourceID, slotReport);
+			RegistrationResponse response = successfulFuture.get(5, TimeUnit.SECONDS);
+			assertTrue(response instanceof TaskExecutorRegistrationSuccess);
+
+			ArgumentCaptor<Runnable> heartbeatRunnableCaptor = ArgumentCaptor.forClass(Runnable.class);
+			verify(scheduledExecutor, times(1)).scheduleAtFixedRate(
+				heartbeatRunnableCaptor.capture(),
+				eq(0L),
+				eq(heartbeatInterval),
+				eq(TimeUnit.MILLISECONDS));
+
+			Runnable heartbeatRunnable = heartbeatRunnableCaptor.getValue();
+
+			ArgumentCaptor<Runnable> timeoutRunnableCaptor = ArgumentCaptor.forClass(Runnable.class);
+			verify(scheduledExecutor).schedule(timeoutRunnableCaptor.capture(), eq(heartbeatTimeout), eq(TimeUnit.MILLISECONDS));
+
+			Runnable timeoutRunnable = timeoutRunnableCaptor.getValue();
+
+			// run the first heartbeat request
+			heartbeatRunnable.run();
+
+			verify(taskExecutorGateway, times(1)).heartbeatFromResourceManager(eq(resourceManagerResourceID));
+
+			// run the timeout runnable to simulate a heartbeat timeout
+			timeoutRunnable.run();
+
+			verify(taskExecutorGateway).disconnectResourceManager(any(TimeoutException.class));
+
+		} finally {
+			rpcService.stopService();
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/fd90672f/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
index 43536b6..73da244 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
@@ -26,10 +26,7 @@ import org.apache.flink.runtime.concurrent.ScheduledExecutor;
 import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
 import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoader;
 import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory;
-import org.apache.flink.runtime.heartbeat.HeartbeatListener;
-import org.apache.flink.runtime.heartbeat.HeartbeatManager;
-import org.apache.flink.runtime.heartbeat.HeartbeatManagerSenderImpl;
-import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import org.apache.flink.runtime.heartbeat.*;
 import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobmanager.OnCompletionActions;
@@ -38,14 +35,12 @@ import org.apache.flink.runtime.rpc.TestingSerialRpcService;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.util.TestingFatalErrorHandler;
-import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.TestLogger;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.mockito.ArgumentCaptor;
 import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
-import org.slf4j.Logger;
 
 import java.net.InetAddress;
 import java.net.URL;
@@ -108,10 +103,9 @@ public class JobMasterTest extends TestLogger {
 				testingFatalErrorHandler,
 				new FlinkUserCodeClassLoader(new URL[0]));
 
-			// also start the heartbeat manager in job manager
 			jobMaster.start(jmLeaderId);
 
-			// register task manager will trigger monitoring heartbeat target, schedule heartbeat request in interval time
+			// register task manager will trigger monitor heartbeat target, schedule heartbeat request at interval time
 			jobMaster.registerTaskManager(taskManagerAddress, taskManagerLocation, jmLeaderId);
 
 			ArgumentCaptor<Runnable> heartbeatRunnableCaptor = ArgumentCaptor.forClass(Runnable.class);
@@ -145,32 +139,4 @@ public class JobMasterTest extends TestLogger {
 			rpc.stopService();
 		}
 	}
-
-	private static class TestingHeartbeatServices extends HeartbeatServices {
-
-		private final ScheduledExecutor scheduledExecutorToUse;
-
-		public TestingHeartbeatServices(long heartbeatInterval, long heartbeatTimeout, ScheduledExecutor scheduledExecutorToUse) {
-			super(heartbeatInterval, heartbeatTimeout);
-
-			this.scheduledExecutorToUse = Preconditions.checkNotNull(scheduledExecutorToUse);
-		}
-
-		@Override
-		public <I, O> HeartbeatManager<I, O> createHeartbeatManagerSender(
-			ResourceID resourceId,
-			HeartbeatListener<I, O> heartbeatListener,
-			ScheduledExecutor scheduledExecutor,
-			Logger log) {
-
-			return new HeartbeatManagerSenderImpl<>(
-				heartbeatInterval,
-				heartbeatTimeout,
-				resourceId,
-				heartbeatListener,
-				org.apache.flink.runtime.concurrent.Executors.directExecutor(),
-				scheduledExecutorToUse,
-				log);
-		}
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/fd90672f/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java
index 1aa799b..39594df 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java
@@ -19,6 +19,8 @@
 package org.apache.flink.runtime.resourcemanager;
 
 import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
 import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
 import org.apache.flink.runtime.metrics.MetricRegistry;
@@ -40,12 +42,15 @@ public class ResourceManagerHATest {
 
 	@Test
 	public void testGrantAndRevokeLeadership() throws Exception {
+		ResourceID rmResourceId = ResourceID.generate();
 		RpcService rpcService = new TestingSerialRpcService();
 
 		TestingLeaderElectionService leaderElectionService = new TestingLeaderElectionService();
 		TestingHighAvailabilityServices highAvailabilityServices = new TestingHighAvailabilityServices();
 		highAvailabilityServices.setResourceManagerLeaderElectionService(leaderElectionService);
 
+		HeartbeatServices heartbeatServices = mock(HeartbeatServices.class);
+
 		ResourceManagerConfiguration resourceManagerConfiguration = new ResourceManagerConfiguration(
 			Time.seconds(5L),
 			Time.seconds(5L));
@@ -63,9 +68,11 @@ public class ResourceManagerHATest {
 
 		final ResourceManager resourceManager =
 			new StandaloneResourceManager(
+				rmResourceId,
 				rpcService,
 				resourceManagerConfiguration,
 				highAvailabilityServices,
+				heartbeatServices,
 				slotManagerFactory,
 				metricRegistry,
 				resourceManagerRuntimeServices.getJobLeaderIdService(),

http://git-wip-us.apache.org/repos/asf/flink/blob/fd90672f/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java
index 9a68eca..0401f9e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java
@@ -20,7 +20,9 @@ package org.apache.flink.runtime.resourcemanager;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
 import org.apache.flink.runtime.jobmaster.JobMasterGateway;
@@ -196,10 +198,13 @@ public class ResourceManagerJobMasterTest {
 			JobID jobID,
 			TestingLeaderRetrievalService jobMasterLeaderRetrievalService,
 			FatalErrorHandler fatalErrorHandler) throws Exception {
+		ResourceID rmResourceId = ResourceID.generate();
 		TestingHighAvailabilityServices highAvailabilityServices = new TestingHighAvailabilityServices();
 		highAvailabilityServices.setResourceManagerLeaderElectionService(resourceManagerLeaderElectionService);
 		highAvailabilityServices.setJobMasterLeaderRetriever(jobID, jobMasterLeaderRetrievalService);
 
+		HeartbeatServices heartbeatServices = mock(HeartbeatServices.class);
+
 		ResourceManagerConfiguration resourceManagerConfiguration = new ResourceManagerConfiguration(
 			Time.seconds(5L),
 			Time.seconds(5L));
@@ -211,9 +216,11 @@ public class ResourceManagerJobMasterTest {
 			Time.minutes(5L));
 
 		ResourceManager resourceManager = new StandaloneResourceManager(
+			rmResourceId,
 			rpcService,
 			resourceManagerConfiguration,
 			highAvailabilityServices,
+			heartbeatServices,
 			slotManagerFactory,
 			metricRegistry,
 			jobLeaderIdService,

http://git-wip-us.apache.org/repos/asf/flink/blob/fd90672f/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
index 0a1addb..7c811d9 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.resourcemanager;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
 import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
 import org.apache.flink.runtime.metrics.MetricRegistry;
@@ -52,6 +53,8 @@ public class ResourceManagerTaskExecutorTest {
 
 	private ResourceID taskExecutorResourceID;
 
+	private ResourceID resourceManagerResourceID;
+
 	private StandaloneResourceManager resourceManager;
 
 	private UUID leaderSessionId;
@@ -63,6 +66,7 @@ public class ResourceManagerTaskExecutorTest {
 		rpcService = new TestingSerialRpcService();
 
 		taskExecutorResourceID = mockTaskExecutor(taskExecutorAddress);
+		resourceManagerResourceID = ResourceID.generate();
 		TestingLeaderElectionService rmLeaderElectionService = new TestingLeaderElectionService();
 		testingFatalErrorHandler = new TestingFatalErrorHandler();
 		resourceManager = createAndStartResourceManager(rmLeaderElectionService, testingFatalErrorHandler);
@@ -144,6 +148,7 @@ public class ResourceManagerTaskExecutorTest {
 
 	private StandaloneResourceManager createAndStartResourceManager(TestingLeaderElectionService rmLeaderElectionService, FatalErrorHandler fatalErrorHandler) throws Exception {
 		TestingHighAvailabilityServices highAvailabilityServices = new TestingHighAvailabilityServices();
+		HeartbeatServices heartbeatServices = mock(HeartbeatServices.class);
 		highAvailabilityServices.setResourceManagerLeaderElectionService(rmLeaderElectionService);
 		TestingSlotManagerFactory slotManagerFactory = new TestingSlotManagerFactory();
 		ResourceManagerConfiguration resourceManagerConfiguration = new ResourceManagerConfiguration(
@@ -158,9 +163,11 @@ public class ResourceManagerTaskExecutorTest {
 
 		StandaloneResourceManager resourceManager =
 			new StandaloneResourceManager(
+				resourceManagerResourceID,
 				rpcService,
 				resourceManagerConfiguration,
 				highAvailabilityServices,
+				heartbeatServices,
 				slotManagerFactory,
 				metricRegistry,
 				jobLeaderIdService,

http://git-wip-us.apache.org/repos/asf/flink/blob/fd90672f/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
index ea660f8..28ed697 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
@@ -25,6 +25,7 @@ import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.clusterframework.types.SlotID;
 import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
 import org.apache.flink.runtime.jobmaster.JobMasterGateway;
@@ -98,6 +99,7 @@ public class SlotProtocolTest extends TestLogger {
 		final String rmAddress = "/rm1";
 		final String jmAddress = "/jm1";
 		final JobID jobID = new JobID();
+		final ResourceID rmResourceId = new ResourceID(rmAddress);
 
 		testRpcService.registerGateway(jmAddress, mock(JobMasterGateway.class));
 
@@ -117,11 +119,16 @@ public class SlotProtocolTest extends TestLogger {
 			Time.seconds(5L));
 
 		final TestingSlotManagerFactory slotManagerFactory = new TestingSlotManagerFactory();
+
+		final HeartbeatServices heartbeatServices = mock(HeartbeatServices.class);
+
 		SpiedResourceManager resourceManager =
 			new SpiedResourceManager(
+				rmResourceId,
 				testRpcService,
 				resourceManagerConfiguration,
 				testingHaServices,
+				heartbeatServices,
 				slotManagerFactory,
 				mock(MetricRegistry.class),
 				jobLeaderIdService,
@@ -198,6 +205,7 @@ public class SlotProtocolTest extends TestLogger {
 		final String jmAddress = "/jm1";
 		final String tmAddress = "/tm1";
 		final JobID jobID = new JobID();
+		final ResourceID rmResourceId = new ResourceID(rmAddress);
 
 		testRpcService.registerGateway(jmAddress, mock(JobMasterGateway.class));
 
@@ -224,11 +232,16 @@ public class SlotProtocolTest extends TestLogger {
 			Time.seconds(5L));
 
 		TestingSlotManagerFactory slotManagerFactory = new TestingSlotManagerFactory();
+
+		HeartbeatServices heartbeatServices = mock(HeartbeatServices.class);
+
 		ResourceManager<ResourceID> resourceManager =
 			Mockito.spy(new StandaloneResourceManager(
+				rmResourceId,
 				testRpcService,
 				resourceManagerConfiguration,
 				testingHaServices,
+				heartbeatServices,
 				slotManagerFactory,
 				mock(MetricRegistry.class),
 				jobLeaderIdService,
@@ -302,17 +315,21 @@ public class SlotProtocolTest extends TestLogger {
 		private int startNewWorkerCalled = 0;
 
 		public SpiedResourceManager(
+				ResourceID resourceId,
 				RpcService rpcService,
 				ResourceManagerConfiguration resourceManagerConfiguration,
 				HighAvailabilityServices highAvailabilityServices,
+				HeartbeatServices heartbeatServices,
 				SlotManagerFactory slotManagerFactory,
 				MetricRegistry metricRegistry,
 				JobLeaderIdService jobLeaderIdService,
 				FatalErrorHandler fatalErrorHandler) {
 			super(
+				resourceId,
 				rpcService,
 				resourceManagerConfiguration,
 				highAvailabilityServices,
+				heartbeatServices,
 				slotManagerFactory,
 				metricRegistry,
 				jobLeaderIdService,

http://git-wip-us.apache.org/repos/asf/flink/blob/fd90672f/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
index f6c2dce..4e76486 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
@@ -87,6 +87,7 @@ public class TaskExecutorITCase {
 		final String rmAddress = "rm";
 		final String jmAddress = "jm";
 		final UUID jmLeaderId = UUID.randomUUID();
+		final ResourceID rmResourceId = new ResourceID(rmAddress);
 		final JobID jobId = new JobID();
 		final ResourceProfile resourceProfile = new ResourceProfile(1.0, 1);
 
@@ -119,9 +120,11 @@ public class TaskExecutorITCase {
 		final JobLeaderService jobLeaderService = new JobLeaderService(taskManagerLocation);
 
 		ResourceManager<ResourceID> resourceManager = new StandaloneResourceManager(
+			rmResourceId,
 			rpcService,
 			resourceManagerConfiguration,
 			testingHAServices,
+			heartbeatServices,
 			slotManagerFactory,
 			metricRegistry,
 			jobLeaderIdService,

http://git-wip-us.apache.org/repos/asf/flink/blob/fd90672f/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
index 67196aa..d1f6e2e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
@@ -213,9 +213,104 @@ public class TaskExecutorTest extends TestLogger {
 	}
 
 	@Test
+	public void testHeartbeatTimeoutWithResourceManager() throws Exception {
+		final String rmAddress = "rm";
+		final String tmAddress = "tm";
+		final ResourceID rmResourceId = new ResourceID(rmAddress);
+		final ResourceID tmResourceId = new ResourceID(tmAddress);
+		final UUID rmLeaderId = UUID.randomUUID();
+
+		// register the mock resource manager gateway
+		ResourceManagerGateway rmGateway = mock(ResourceManagerGateway.class);
+		when(rmGateway.registerTaskExecutor(
+			any(UUID.class), anyString(), any(ResourceID.class), any(SlotReport.class), any(Time.class)))
+			.thenReturn(FlinkCompletableFuture.<RegistrationResponse>completed(
+					new TaskExecutorRegistrationSuccess(new InstanceID(), rmResourceId, 10L)));
+
+		final TestingSerialRpcService rpc = new TestingSerialRpcService();
+		rpc.registerGateway(rmAddress, rmGateway);
+
+		final TestingLeaderRetrievalService testLeaderService = new TestingLeaderRetrievalService();
+		final TestingHighAvailabilityServices haServices = new TestingHighAvailabilityServices();
+		haServices.setResourceManagerLeaderRetriever(testLeaderService);
+
+		final TaskManagerConfiguration taskManagerConfiguration = mock(TaskManagerConfiguration.class);
+		when(taskManagerConfiguration.getNumberSlots()).thenReturn(1);
+
+		final TaskManagerLocation taskManagerLocation = mock(TaskManagerLocation.class);
+		when(taskManagerLocation.getResourceID()).thenReturn(tmResourceId);
+
+		final TaskSlotTable taskSlotTable = mock(TaskSlotTable.class);
+		final SlotReport slotReport = new SlotReport();
+		when(taskSlotTable.createSlotReport(any(ResourceID.class))).thenReturn(slotReport);
+
+		final TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler();
+
+		final long heartbeatTimeout = 10L;
+		HeartbeatServices heartbeatServices = mock(HeartbeatServices.class);
+		when(heartbeatServices.createHeartbeatManager(
+			eq(taskManagerLocation.getResourceID()),
+			any(HeartbeatListener.class),
+			any(ScheduledExecutor.class),
+			any(Logger.class))).thenAnswer(
+			new Answer<HeartbeatManagerImpl<Void, Void>>() {
+				@Override
+				public HeartbeatManagerImpl<Void, Void> answer(InvocationOnMock invocation) throws Throwable {
+					return new HeartbeatManagerImpl<>(
+						heartbeatTimeout,
+						taskManagerLocation.getResourceID(),
+						(HeartbeatListener<Void, Void>)invocation.getArguments()[1],
+						(Executor)invocation.getArguments()[2],
+						(ScheduledExecutor)invocation.getArguments()[2],
+						(Logger)invocation.getArguments()[3]);
+					}
+				}
+		);
+
+		try {
+			final TaskExecutor taskManager = new TaskExecutor(
+				taskManagerConfiguration,
+				taskManagerLocation,
+				rpc,
+				mock(MemoryManager.class),
+				mock(IOManager.class),
+				mock(NetworkEnvironment.class),
+				haServices,
+				heartbeatServices,
+				mock(MetricRegistry.class),
+				mock(TaskManagerMetricGroup.class),
+				mock(BroadcastVariableManager.class),
+				mock(FileCache.class),
+				taskSlotTable,
+				mock(JobManagerTable.class),
+				mock(JobLeaderService.class),
+				testingFatalErrorHandler);
+
+			taskManager.start();
+
+			// define a leader and see that a registration happens
+			testLeaderService.notifyListener(rmAddress, rmLeaderId);
+
+			// register resource manager success will trigger monitoring heartbeat target between tm and rm
+			verify(rmGateway).registerTaskExecutor(
+					eq(rmLeaderId), eq(taskManager.getAddress()), eq(tmResourceId), any(SlotReport.class), any(Time.class));
+
+			// heartbeat timeout should trigger disconnect TaskManager from ResourceManager
+			verify(rmGateway, timeout(heartbeatTimeout * 5)).disconnectTaskManager(eq(taskManagerLocation.getResourceID()), any(TimeoutException.class));
+
+			// check if a concurrent error occurred
+			testingFatalErrorHandler.rethrowError();
+
+		} finally {
+			rpc.stopService();
+		}
+	}
+
+	@Test
 	public void testImmediatelyRegistersIfLeaderIsKnown() throws Exception {
 		final ResourceID resourceID = ResourceID.generate();
 		final String resourceManagerAddress = "/resource/manager/address/one";
+		final ResourceID resourceManagerResourceId = new ResourceID(resourceManagerAddress);
 
 		final TestingSerialRpcService rpc = new TestingSerialRpcService();
 		try {
@@ -223,7 +318,8 @@ public class TaskExecutorTest extends TestLogger {
 			ResourceManagerGateway rmGateway = mock(ResourceManagerGateway.class);
 			when(rmGateway.registerTaskExecutor(
 					any(UUID.class), anyString(), any(ResourceID.class), any(SlotReport.class), any(Time.class)))
-				.thenReturn(FlinkCompletableFuture.<RegistrationResponse>completed(new TaskExecutorRegistrationSuccess(new InstanceID(), 10L)));
+				.thenReturn(FlinkCompletableFuture.<RegistrationResponse>completed(new TaskExecutorRegistrationSuccess(
+					new InstanceID(), resourceManagerResourceId, 10L)));
 
 			TaskManagerConfiguration taskManagerServicesConfiguration = mock(TaskManagerConfiguration.class);
 			when(taskManagerServicesConfiguration.getNumberSlots()).thenReturn(1);
@@ -275,12 +371,14 @@ public class TaskExecutorTest extends TestLogger {
 
 	@Test
 	public void testTriggerRegistrationOnLeaderChange() throws Exception {
-		final ResourceID resourceID = ResourceID.generate();
+		final ResourceID tmResourceID = ResourceID.generate();
 
 		final String address1 = "/resource/manager/address/one";
 		final String address2 = "/resource/manager/address/two";
 		final UUID leaderId1 = UUID.randomUUID();
 		final UUID leaderId2 = UUID.randomUUID();
+		final ResourceID rmResourceId1 = new ResourceID(address1);
+		final ResourceID rmResourceId2 = new ResourceID(address2);
 
 		final TestingSerialRpcService rpc = new TestingSerialRpcService();
 		try {
@@ -291,11 +389,11 @@ public class TaskExecutorTest extends TestLogger {
 			when(rmGateway1.registerTaskExecutor(
 					any(UUID.class), anyString(), any(ResourceID.class), any(SlotReport.class), any(Time.class)))
 					.thenReturn(FlinkCompletableFuture.<RegistrationResponse>completed(
-						new TaskExecutorRegistrationSuccess(new InstanceID(), 10L)));
+						new TaskExecutorRegistrationSuccess(new InstanceID(), rmResourceId1, 10L)));
 			when(rmGateway2.registerTaskExecutor(
 					any(UUID.class), anyString(), any(ResourceID.class), any(SlotReport.class), any(Time.class)))
 					.thenReturn(FlinkCompletableFuture.<RegistrationResponse>completed(
-						new TaskExecutorRegistrationSuccess(new InstanceID(), 10L)));
+						new TaskExecutorRegistrationSuccess(new InstanceID(), rmResourceId2, 10L)));
 
 			rpc.registerGateway(address1, rmGateway1);
 			rpc.registerGateway(address2, rmGateway2);
@@ -313,7 +411,7 @@ public class TaskExecutorTest extends TestLogger {
 			when(taskManagerServicesConfiguration.getTmpDirectories()).thenReturn(new String[1]);
 
 			TaskManagerLocation taskManagerLocation = mock(TaskManagerLocation.class);
-			when(taskManagerLocation.getResourceID()).thenReturn(resourceID);
+			when(taskManagerLocation.getResourceID()).thenReturn(tmResourceID);
 			when(taskManagerLocation.getHostname()).thenReturn("foobar");
 
 			final TaskSlotTable taskSlotTable = mock(TaskSlotTable.class);
@@ -350,7 +448,7 @@ public class TaskExecutorTest extends TestLogger {
 			testLeaderService.notifyListener(address1, leaderId1);
 
 			verify(rmGateway1).registerTaskExecutor(
-					eq(leaderId1), eq(taskManagerAddress), eq(resourceID), any(SlotReport.class), any(Time.class));
+					eq(leaderId1), eq(taskManagerAddress), eq(tmResourceID), any(SlotReport.class), any(Time.class));
 			assertNotNull(taskManager.getResourceManagerConnection());
 
 			// cancel the leader 
@@ -360,7 +458,7 @@ public class TaskExecutorTest extends TestLogger {
 			testLeaderService.notifyListener(address2, leaderId2);
 
 			verify(rmGateway2).registerTaskExecutor(
-					eq(leaderId2), eq(taskManagerAddress), eq(resourceID), eq(slotReport), any(Time.class));
+					eq(leaderId2), eq(taskManagerAddress), eq(tmResourceID), eq(slotReport), any(Time.class));
 			assertNotNull(taskManager.getResourceManagerConnection());
 
 			// check if a concurrent error occurred
@@ -531,6 +629,7 @@ public class TaskExecutorTest extends TestLogger {
 
 		final String resourceManagerAddress = "rm";
 		final UUID resourceManagerLeaderId = UUID.randomUUID();
+		final ResourceID resourceManagerResourceId = new ResourceID(resourceManagerAddress);
 
 		final ResourceManagerGateway resourceManagerGateway = mock(ResourceManagerGateway.class);
 		final InstanceID registrationId = new InstanceID();
@@ -540,7 +639,7 @@ public class TaskExecutorTest extends TestLogger {
 			any(String.class),
 			eq(resourceId),
 			any(SlotReport.class),
-			any(Time.class))).thenReturn(FlinkCompletableFuture.<RegistrationResponse>completed(new TaskExecutorRegistrationSuccess(registrationId, 1000L)));
+			any(Time.class))).thenReturn(FlinkCompletableFuture.<RegistrationResponse>completed(new TaskExecutorRegistrationSuccess(registrationId, resourceManagerResourceId, 1000L)));
 
 		final String jobManagerAddress = "jm";
 		final UUID jobManagerLeaderId = UUID.randomUUID();
@@ -638,6 +737,7 @@ public class TaskExecutorTest extends TestLogger {
 
 		final String resourceManagerAddress = "rm";
 		final UUID resourceManagerLeaderId = UUID.randomUUID();
+		final ResourceID resourceManagerResourceId = new ResourceID(resourceManagerAddress);
 
 		final String jobManagerAddress = "jm";
 		final UUID jobManagerLeaderId = UUID.randomUUID();
@@ -655,7 +755,7 @@ public class TaskExecutorTest extends TestLogger {
 			any(String.class),
 			eq(resourceId),
 			any(SlotReport.class),
-			any(Time.class))).thenReturn(FlinkCompletableFuture.<RegistrationResponse>completed(new TaskExecutorRegistrationSuccess(registrationId, 1000L)));
+			any(Time.class))).thenReturn(FlinkCompletableFuture.<RegistrationResponse>completed(new TaskExecutorRegistrationSuccess(registrationId, resourceManagerResourceId, 1000L)));
 
 		final ResourceID jmResourceId = new ResourceID(jobManagerAddress);
 		final int blobPort = 42;
@@ -844,6 +944,7 @@ public class TaskExecutorTest extends TestLogger {
 
 		final String resourceManagerAddress = "rm";
 		final UUID resourceManagerLeaderId = UUID.randomUUID();
+		final ResourceID resourceManagerResourceId = new ResourceID(resourceManagerAddress);
 
 		final String jobManagerAddress = "jm";
 		final UUID jobManagerLeaderId = UUID.randomUUID();
@@ -862,7 +963,7 @@ public class TaskExecutorTest extends TestLogger {
 			eq(resourceId),
 			any(SlotReport.class),
 			any(Time.class))).thenReturn(
-				FlinkCompletableFuture.<RegistrationResponse>completed(new TaskExecutorRegistrationSuccess(registrationId, 1000L)));
+				FlinkCompletableFuture.<RegistrationResponse>completed(new TaskExecutorRegistrationSuccess(registrationId, resourceManagerResourceId, 1000L)));
 
 		final ResourceID jmResourceId = new ResourceID(jobManagerAddress);
 		final int blobPort = 42;

http://git-wip-us.apache.org/repos/asf/flink/blob/fd90672f/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java
index 7a0dbbe..ed672a3 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java
@@ -198,15 +198,18 @@ public class YarnFlinkApplicationMasterRunner extends AbstractYarnFlinkApplicati
 			haServices,
 			commonRpcService.getScheduledExecutor());
 
-		return new YarnResourceManager(config,
-				ENV,
-				commonRpcService,
-				resourceManagerConfiguration,
-				haServices,
-				resourceManagerRuntimeServices.getSlotManagerFactory(),
-				metricRegistry,
-				resourceManagerRuntimeServices.getJobLeaderIdService(),
-				this);
+		return new YarnResourceManager(
+			ResourceID.generate(),
+			config,
+			ENV,
+			commonRpcService,
+			resourceManagerConfiguration,
+			haServices,
+			heartbeatServices,
+			resourceManagerRuntimeServices.getSlotManagerFactory(),
+			metricRegistry,
+			resourceManagerRuntimeServices.getJobLeaderIdService(),
+			this);
 	}
 
 	private JobManagerRunner createJobManagerRunner(Configuration config) throws Exception{

http://git-wip-us.apache.org/repos/asf/flink/blob/fd90672f/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
index ab96441..a308079 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
@@ -26,6 +26,7 @@ import org.apache.flink.runtime.clusterframework.BootstrapTools;
 import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
@@ -106,19 +107,23 @@ public class YarnResourceManager extends ResourceManager<ResourceID> implements
 	final private Map<ResourceProfile, Integer> resourcePriorities = new HashMap<>();
 
 	public YarnResourceManager(
+			ResourceID resourceId,
 			Configuration flinkConfig,
 			Map<String, String> env,
 			RpcService rpcService,
 			ResourceManagerConfiguration resourceManagerConfiguration,
 			HighAvailabilityServices highAvailabilityServices,
+			HeartbeatServices heartbeatServices,
 			SlotManagerFactory slotManagerFactory,
 			MetricRegistry metricRegistry,
 			JobLeaderIdService jobLeaderIdService,
 			FatalErrorHandler fatalErrorHandler) {
 		super(
+			resourceId,
 			rpcService,
 			resourceManagerConfiguration,
 			highAvailabilityServices,
+			heartbeatServices,
 			slotManagerFactory,
 			metricRegistry,
 			jobLeaderIdService,
@@ -231,7 +236,8 @@ public class YarnResourceManager extends ResourceManager<ResourceID> implements
 	public void onContainersCompleted(List<ContainerStatus> list) {
 		for (ContainerStatus container : list) {
 			if (container.getExitStatus() < 0) {
-				notifyWorkerFailed(new ResourceID(container.getContainerId().toString()), container.getDiagnostics());
+				closeTaskManagerConnection(new ResourceID(
+					container.getContainerId().toString()), new Exception(container.getDiagnostics()));
 			}
 		}
 	}