You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/03/17 16:00:24 UTC

[2/2] flink git commit: [FLINK-4364] Introduce HeartbeatServices for the HeartbeatManager instantiation

[FLINK-4364] Introduce HeartbeatServices for the HeartbeatManager instantiation

The HeartbeatServices are used to create all services relevant for heartbeating. This
includes at the moment the creation of HeartbeatManager implementations which actively
send heartbeats and those which only respond to heartbeat requests.

Add comments


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/97ccc147
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/97ccc147
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/97ccc147

Branch: refs/heads/master
Commit: 97ccc147382d866bc3e82caf9b87f7cd2e5989f9
Parents: 0b3d5c2
Author: Till Rohrmann <tr...@apache.org>
Authored: Tue Mar 7 22:45:42 2017 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Fri Mar 17 17:00:05 2017 +0100

----------------------------------------------------------------------
 .../configuration/HeartbeatManagerOptions.java  |   2 +-
 .../runtime/heartbeat/HeartbeatServices.java    | 116 ++++++++++++++++++
 .../heartbeat/TestingHeartbeatManagerImpl.java  |   3 +-
 .../TestingHeartbeatManagerSenderImpl.java      |   3 +-
 .../runtime/jobmaster/JobManagerRunner.java     |  83 +++++++------
 .../flink/runtime/jobmaster/JobMaster.java      | 106 +++++++++-------
 .../runtime/jobmaster/JobMasterGateway.java     |   3 +-
 .../flink/runtime/minicluster/MiniCluster.java  |  14 ++-
 .../minicluster/MiniClusterJobDispatcher.java   |  24 +++-
 .../runtime/taskexecutor/TaskExecutor.java      | 105 +++++++++-------
 .../taskexecutor/TaskExecutorGateway.java       |  14 ++-
 .../runtime/taskexecutor/TaskManagerRunner.java |  35 +++---
 .../jobmaster/JobManagerRunnerMockTest.java     |   6 +
 .../flink/runtime/jobmaster/JobMasterTest.java  | 120 ++++++++++++-------
 .../taskexecutor/TaskExecutorITCase.java        |   7 +-
 .../runtime/taskexecutor/TaskExecutorTest.java  | 110 ++++++++---------
 .../yarn/YarnFlinkApplicationMasterRunner.java  |  20 +++-
 .../flink/yarn/YarnTaskExecutorRunner.java      |  11 +-
 18 files changed, 515 insertions(+), 267 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/97ccc147/flink-core/src/main/java/org/apache/flink/configuration/HeartbeatManagerOptions.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/HeartbeatManagerOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/HeartbeatManagerOptions.java
index 2258eb1..81cbc5d 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/HeartbeatManagerOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/HeartbeatManagerOptions.java
@@ -30,7 +30,7 @@ public class HeartbeatManagerOptions {
 
 	/** Time interval for requesting heartbeat from sender side */
 	public static final ConfigOption<Long> HEARTBEAT_INTERVAL =
-			key("heartbeat.sender.interval")
+			key("heartbeat.interval")
 			.defaultValue(10000L);
 
 	/** Timeout for requesting and receiving heartbeat for both sender and receiver sides */

http://git-wip-us.apache.org/repos/asf/flink/blob/97ccc147/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatServices.java
new file mode 100644
index 0000000..7d55b9c
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatServices.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.heartbeat;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HeartbeatManagerOptions;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.concurrent.ScheduledExecutor;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+
+/**
+ * HeartbeatServices gives access to all services needed for heartbeating. This includes the
+ * creation of heartbeat receivers and heartbeat senders.
+ */
+public class HeartbeatServices {
+
+	/** Heartbeat interval for the created services */
+	protected final long heartbeatInterval;
+
+	/** Heartbeat timeout for the created services */
+	protected final long heartbeatTimeout;
+
+	public HeartbeatServices(long heartbeatInterval, long heartbeatTimeout) {
+		Preconditions.checkArgument(0L < heartbeatInterval, "The heartbeat interval must be larger than 0.");
+		Preconditions.checkArgument(heartbeatInterval <= heartbeatTimeout, "The heartbeat timeout should be larger or equal than the heartbeat timeout.");
+
+		this.heartbeatInterval = heartbeatInterval;
+		this.heartbeatTimeout = heartbeatTimeout;
+	}
+
+	/**
+	 * Creates a heartbeat manager which does not actively send heartbeats.
+	 *
+	 * @param resourceId Resource Id which identifies the owner of the heartbeat manager
+	 * @param heartbeatListener Listener which will be notified upon heartbeat timeouts for registered
+	 *                          targets
+	 * @param scheduledExecutor Scheduled executor to be used for scheduling heartbeat timeouts
+	 * @param log Logger to be used for the logging
+	 * @param <I> Type of the incoming payload
+	 * @param <O> Type of the outgoing payload
+	 * @return A new HeartbeatManager instance
+	 */
+	public <I, O> HeartbeatManager<I, O> createHeartbeatManager(
+		ResourceID resourceId,
+		HeartbeatListener<I, O> heartbeatListener,
+		ScheduledExecutor scheduledExecutor,
+		Logger log) {
+
+		return new HeartbeatManagerImpl<>(
+			heartbeatTimeout,
+			resourceId,
+			heartbeatListener,
+			scheduledExecutor,
+			scheduledExecutor,
+			log);
+	}
+
+	/**
+	 * Creates a heartbeat manager which actively sends heartbeats to monitoring targets.
+	 *
+	 * @param resourceId Resource Id which identifies the owner of the heartbeat manager
+	 * @param heartbeatListener Listener which will be notified upon heartbeat timeouts for registered
+	 *                          targets
+	 * @param scheduledExecutor Scheduled executor to be used for scheduling heartbeat timeouts
+	 * @param log Logger to be used for the logging
+	 * @param <I> Type of the incoming payload
+	 * @param <O> Type of the outgoing payload
+	 * @return A new HeartbeatManager instance which actively sends heartbeats
+	 */
+	public <I, O> HeartbeatManager<I, O> createHeartbeatManagerSender(
+		ResourceID resourceId,
+		HeartbeatListener<I, O> heartbeatListener,
+		ScheduledExecutor scheduledExecutor,
+		Logger log) {
+
+		return new HeartbeatManagerSenderImpl<>(
+			heartbeatInterval,
+			heartbeatTimeout,
+			resourceId,
+			heartbeatListener,
+			scheduledExecutor,
+			scheduledExecutor,
+			log);
+	}
+
+	/**
+	 * Creates an HeartbeatServices instance from a {@link Configuration}.
+	 *
+	 * @param configuration Configuration to be used for the HeartbeatServices creation
+	 * @return An HeartbeatServices instance created from the given configuration
+	 */
+	public static HeartbeatServices fromConfiguration(Configuration configuration) {
+		long heartbeatInterval = configuration.getLong(HeartbeatManagerOptions.HEARTBEAT_INTERVAL);
+
+		long heartbeatTimeout = configuration.getLong(HeartbeatManagerOptions.HEARTBEAT_TIMEOUT);
+
+		return new HeartbeatServices(heartbeatInterval, heartbeatTimeout);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/97ccc147/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/TestingHeartbeatManagerImpl.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/TestingHeartbeatManagerImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/TestingHeartbeatManagerImpl.java
index 1238f1a..a6e056d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/TestingHeartbeatManagerImpl.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/TestingHeartbeatManagerImpl.java
@@ -40,11 +40,12 @@ public class TestingHeartbeatManagerImpl<I, O> extends HeartbeatManagerImpl<I, O
 			CountDownLatch waitLatch,
 			long heartbeatTimeoutIntervalMs,
 			ResourceID ownResourceID,
+			HeartbeatListener<I, O> heartbeatListener,
 			Executor executor,
 			ScheduledExecutor scheduledExecutor,
 			Logger log) {
 
-		super(heartbeatTimeoutIntervalMs, ownResourceID, executor, scheduledExecutor, log);
+		super(heartbeatTimeoutIntervalMs, ownResourceID, heartbeatListener, executor, scheduledExecutor, log);
 
 		this.waitLatch = waitLatch;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/97ccc147/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/TestingHeartbeatManagerSenderImpl.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/TestingHeartbeatManagerSenderImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/TestingHeartbeatManagerSenderImpl.java
index 7000895..36f7e96 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/TestingHeartbeatManagerSenderImpl.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/TestingHeartbeatManagerSenderImpl.java
@@ -38,11 +38,12 @@ public class TestingHeartbeatManagerSenderImpl<I, O> extends HeartbeatManagerSen
 			long heartbeatPeriod,
 			long heartbeatTimeout,
 			ResourceID ownResourceID,
+			HeartbeatListener<I, O> heartbeatListener,
 			Executor executor,
 			ScheduledExecutor scheduledExecutor,
 			Logger log) {
 
-		super(heartbeatPeriod, heartbeatTimeout, ownResourceID, executor, scheduledExecutor, log);
+		super(heartbeatPeriod, heartbeatTimeout, ownResourceID, heartbeatListener, executor, scheduledExecutor, log);
 
 		this.waitLatch = waitLatch;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/97ccc147/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
index eced869..33ee29d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
@@ -21,11 +21,10 @@ package org.apache.flink.runtime.jobmaster;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.HeartbeatManagerOptions;
 import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
-import org.apache.flink.runtime.heartbeat.HeartbeatManagerSenderImpl;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.highavailability.RunningJobsRegistry;
 import org.apache.flink.runtime.highavailability.RunningJobsRegistry.JobSchedulingStatus;
@@ -88,31 +87,47 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions, F
 	// ------------------------------------------------------------------------
 
 	public JobManagerRunner(
+			final ResourceID resourceId,
 			final JobGraph jobGraph,
 			final Configuration configuration,
 			final RpcService rpcService,
 			final HighAvailabilityServices haServices,
+			final HeartbeatServices heartbeatServices,
 			final OnCompletionActions toNotifyOnComplete,
-			final FatalErrorHandler errorHandler) throws Exception
-	{
-		this(jobGraph, configuration, rpcService, haServices,
-				new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(configuration)),
-				toNotifyOnComplete, errorHandler);
+			final FatalErrorHandler errorHandler) throws Exception {
+		this(
+			resourceId,
+			jobGraph,
+			configuration,
+			rpcService,
+			haServices,
+			heartbeatServices,
+			new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(configuration)),
+			toNotifyOnComplete,
+			errorHandler);
 	}
 
 	public JobManagerRunner(
+			final ResourceID resourceId,
 			final JobGraph jobGraph,
 			final Configuration configuration,
 			final RpcService rpcService,
 			final HighAvailabilityServices haServices,
+			final HeartbeatServices heartbeatServices,
 			final MetricRegistry metricRegistry,
 			final OnCompletionActions toNotifyOnComplete,
-			final FatalErrorHandler errorHandler) throws Exception
-	{
-		this(jobGraph, configuration, rpcService, haServices,
-				JobManagerServices.fromConfiguration(configuration, haServices),
-				metricRegistry,
-				toNotifyOnComplete, errorHandler);
+			final FatalErrorHandler errorHandler) throws Exception {
+		this(
+			resourceId,
+			jobGraph,
+			configuration,
+			rpcService,
+			haServices,
+			heartbeatServices,
+			JobManagerServices.fromConfiguration(configuration, haServices),
+			metricRegistry,
+			toNotifyOnComplete,
+			errorHandler);
 	}
 
 	/**
@@ -127,15 +142,16 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions, F
 	 *                   required services could not be started, ot the Job could not be initialized.
 	 */
 	public JobManagerRunner(
+			final ResourceID resourceId,
 			final JobGraph jobGraph,
 			final Configuration configuration,
 			final RpcService rpcService,
 			final HighAvailabilityServices haServices,
+			final HeartbeatServices heartbeatServices,
 			final JobManagerServices jobManagerServices,
 			final MetricRegistry metricRegistry,
 			final OnCompletionActions toNotifyOnComplete,
-			final FatalErrorHandler errorHandler) throws Exception
-	{
+			final FatalErrorHandler errorHandler) throws Exception {
 
 		JobManagerMetricGroup jobManagerMetrics = null;
 
@@ -170,31 +186,22 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions, F
 			this.runningJobsRegistry = haServices.getRunningJobsRegistry();
 			this.leaderElectionService = haServices.getJobManagerLeaderElectionService(jobGraph.getJobID());
 
-			// heartbeat manager last
-			final ResourceID resourceID = ResourceID.generate();
-			final HeartbeatManagerSenderImpl<Void, Void> jobManagerHeartbeatManager = new HeartbeatManagerSenderImpl<>(
-					configuration.getLong(HeartbeatManagerOptions.HEARTBEAT_INTERVAL),
-					configuration.getLong(HeartbeatManagerOptions.HEARTBEAT_TIMEOUT),
-					resourceID,
-					rpcService.getExecutor(),
-					rpcService.getScheduledExecutor(),
-					log);
-
 			// now start the JobManager
 			this.jobManager = new JobMaster(
-					jobGraph, configuration,
-					rpcService,
-					haServices,
-					jobManagerServices.executorService,
-					jobManagerServices.libraryCacheManager,
-					jobManagerServices.restartStrategyFactory,
-					jobManagerServices.rpcAskTimeout,
-					jobManagerMetrics,
-					resourceID,
-					jobManagerHeartbeatManager,
-					this,
-					this,
-					userCodeLoader);
+				resourceId,
+				jobGraph,
+				configuration,
+				rpcService,
+				haServices,
+				heartbeatServices,
+				jobManagerServices.executorService,
+				jobManagerServices.libraryCacheManager,
+				jobManagerServices.restartStrategyFactory,
+				jobManagerServices.rpcAskTimeout,
+				jobManagerMetrics,
+				this,
+				this,
+				userCodeLoader);
 		}
 		catch (Throwable t) {
 			// clean up everything

http://git-wip-us.apache.org/repos/asf/flink/blob/97ccc147/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index 16c243c..243b57f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -51,7 +51,8 @@ import org.apache.flink.runtime.executiongraph.JobStatusListener;
 import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
 import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory;
 import org.apache.flink.runtime.heartbeat.HeartbeatListener;
-import org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl;
+import org.apache.flink.runtime.heartbeat.HeartbeatManager;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.heartbeat.HeartbeatTarget;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.highavailability.LeaderIdMismatchException;
@@ -105,8 +106,8 @@ import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.Callable;
 import java.util.concurrent.Executor;
-import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -129,6 +130,8 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 
 	// ------------------------------------------------------------------------
 
+	private final ResourceID resourceId;
+
 	/** Logical representation of the job */
 	private final JobGraph jobGraph;
 
@@ -150,10 +153,10 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 	private final MetricGroup jobMetricGroup;
 
 	/** The heartbeat manager with task managers */
-	private final HeartbeatManagerImpl<Void, Void> heartbeatManager;
+	private final HeartbeatManager<Void, Void> heartbeatManager;
 
 	/** The execution context which is used to execute futures */
-	private final ExecutorService executionContext;
+	private final Executor executor;
 
 	private final OnCompletionActions jobCompletionActions;
 
@@ -170,8 +173,6 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 
 	private volatile UUID leaderSessionID;
 
-	private final ResourceID resourceID;
-
 	// --------- ResourceManager --------
 
 	/** Leader retriever service used to locate ResourceManager's address */
@@ -187,34 +188,38 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 	// ------------------------------------------------------------------------
 
 	public JobMaster(
+			ResourceID resourceId,
 			JobGraph jobGraph,
 			Configuration configuration,
 			RpcService rpcService,
 			HighAvailabilityServices highAvailabilityService,
-			ScheduledExecutorService executorService,
+			HeartbeatServices heartbeatServices,
+			ScheduledExecutorService executor,
 			BlobLibraryCacheManager libraryCacheManager,
 			RestartStrategyFactory restartStrategyFactory,
 			Time rpcAskTimeout,
 			@Nullable JobManagerMetricGroup jobManagerMetricGroup,
-			ResourceID resourceID,
-			HeartbeatManagerImpl<Void, Void> heartbeatManager,
 			OnCompletionActions jobCompletionActions,
 			FatalErrorHandler errorHandler,
-			ClassLoader userCodeLoader) throws Exception
-	{
+			ClassLoader userCodeLoader) throws Exception {
 		super(rpcService);
 
+		this.resourceId = checkNotNull(resourceId);
 		this.jobGraph = checkNotNull(jobGraph);
 		this.configuration = checkNotNull(configuration);
 		this.rpcTimeout = rpcAskTimeout;
 		this.highAvailabilityServices = checkNotNull(highAvailabilityService);
 		this.libraryCacheManager = checkNotNull(libraryCacheManager);
-		this.executionContext = checkNotNull(executorService);
+		this.executor = checkNotNull(executor);
 		this.jobCompletionActions = checkNotNull(jobCompletionActions);
 		this.errorHandler = checkNotNull(errorHandler);
 		this.userCodeLoader = checkNotNull(userCodeLoader);
-		this.resourceID = checkNotNull(resourceID);
-		this.heartbeatManager = checkNotNull(heartbeatManager);
+
+		this.heartbeatManager = heartbeatServices.createHeartbeatManagerSender(
+			resourceId,
+			new TaskManagerHeartbeatListener(),
+			rpcService.getScheduledExecutor(),
+			log);
 
 		final String jobName = jobGraph.getName();
 		final JobID jid = jobGraph.getJobID();
@@ -251,8 +256,8 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 			null,
 			jobGraph,
 			configuration,
-			executorService,
-			executorService,
+			executor,
+			executor,
 			slotPool.getSlotProvider(),
 			userCodeLoader,
 			checkpointRecoveryFactory,
@@ -288,27 +293,6 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 			// make sure we receive RPC and async calls
 			super.start();
 
-			heartbeatManager.start(new HeartbeatListener<Void, Void>() {
-				@Override
-				public void notifyHeartbeatTimeout(ResourceID resourceID) {
-					log.info("Notify heartbeat timeout with task manager {}", resourceID);
-					heartbeatManager.unmonitorTarget(resourceID);
-
-					getSelf().disconnectTaskManager(resourceID);
-				}
-
-				@Override
-				public void reportPayload(ResourceID resourceID, Void payload) {
-					// currently there is no payload from task manager and resource manager, so this method will not be called.
-				}
-
-				@Override
-				public Future<Void> retrievePayload() {
-					// currently no need payload.
-					return null;
-				}
-			});
-
 			log.info("JobManager started as leader {} for job {}", leaderSessionID, jobGraph.getJobID());
 			getSelf().startJobExecution();
 		}
@@ -322,8 +306,9 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 	 */
 	@Override
 	public void shutDown() throws Exception {
-		// make sure there is a graceful exit
 		heartbeatManager.stop();
+
+		// make sure there is a graceful exit
 		getSelf().suspendExecution(new Exception("JobManager is shutting down."));
 		super.shutDown();
 	}
@@ -371,7 +356,7 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 		}
 
 		// start scheduling job in another thread
-		executionContext.execute(new Runnable() {
+		executor.execute(new Runnable() {
 			@Override
 			public void run() {
 				try {
@@ -545,9 +530,16 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 	}
 
 	@RpcMethod
-	public void disconnectTaskManager(final ResourceID resourceID) {
-		registeredTaskManagers.remove(resourceID);
+	public void disconnectTaskManager(final ResourceID resourceID, final Exception cause) {
+		heartbeatManager.unmonitorTarget(resourceID);
 		slotPoolGateway.releaseTaskManager(resourceID);
+
+		Tuple2<TaskManagerLocation, TaskExecutorGateway> taskManagerConnection = registeredTaskManagers.remove(resourceID);
+
+		if (taskManagerConnection != null) {
+			taskManagerConnection.f1.disconnectJobManager(jobGraph.getJobID(), cause);
+		}
+
 	}
 
 	// TODO: This method needs a leader session ID
@@ -743,7 +735,7 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 
 		if (registeredTaskManagers.containsKey(taskManagerId)) {
 			final RegistrationResponse response = new JMTMRegistrationSuccess(
-					resourceID, libraryCacheManager.getBlobServerPort());
+				resourceId, libraryCacheManager.getBlobServerPort());
 			return FlinkCompletableFuture.completed(response);
 		} else {
 			return getRpcService().execute(new Callable<TaskExecutorGateway>() {
@@ -773,7 +765,7 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 					// monitor the task manager as heartbeat target
 					heartbeatManager.monitorTarget(taskManagerId, new HeartbeatTarget<Void>() {
 						@Override
-						public void sendHeartbeat(ResourceID resourceID, Void payload) {
+						public void receiveHeartbeat(ResourceID resourceID, Void payload) {
 							// the task manager will not request heartbeat, so this method will never be called currently
 						}
 
@@ -783,7 +775,7 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 						}
 					});
 
-					return new JMTMRegistrationSuccess(resourceID, libraryCacheManager.getBlobServerPort());
+					return new JMTMRegistrationSuccess(resourceId, libraryCacheManager.getBlobServerPort());
 				}
 			}, getMainThreadExecutor());
 		}
@@ -799,7 +791,7 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 
 	@RpcMethod
 	public void heartbeatFromTaskManager(final ResourceID resourceID) {
-		heartbeatManager.sendHeartbeat(resourceID, null);
+		heartbeatManager.receiveHeartbeat(resourceID, null);
 	}
 
 	//----------------------------------------------------------------------------------------------
@@ -903,7 +895,7 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 			log.info("Attempting to register at ResourceManager {}", resourceManagerAddress);
 			resourceManagerConnection = new ResourceManagerConnection(
 					log, jobGraph.getJobID(), getAddress(), leaderSessionID,
-					resourceManagerAddress, resourceManagerLeaderId, executionContext);
+					resourceManagerAddress, resourceManagerLeaderId, executor);
 			resourceManagerConnection.start();
 		}
 	}
@@ -1046,4 +1038,26 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 			});
 		}
 	}
+
+	private class TaskManagerHeartbeatListener implements HeartbeatListener<Void, Void> {
+
+		@Override
+		public void notifyHeartbeatTimeout(ResourceID resourceID) {
+			log.info("Task manager with id {} timed out.", resourceID);
+
+			getSelf().disconnectTaskManager(
+				resourceID,
+				new TimeoutException("The heartbeat of TaskManager with id " + resourceID + " timed out."));
+		}
+
+		@Override
+		public void reportPayload(ResourceID resourceID, Void payload) {
+			// nothing to do since there is no payload
+		}
+
+		@Override
+		public Future<Void> retrievePayload() {
+			return FlinkCompletableFuture.completed(null);
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/97ccc147/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
index e7e3111..13a7372 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
@@ -124,8 +124,9 @@ public interface JobMasterGateway extends CheckpointCoordinatorGateway {
 	 * {@link JobMaster}.
 	 *
 	 * @param resourceID identifying the TaskManager to disconnect
+	 * @param cause for the disconnection of the TaskManager
 	 */
-	void disconnectTaskManager(ResourceID resourceID);
+	void disconnectTaskManager(ResourceID resourceID, Exception cause);
 
 	/**
 	 * Disconnects the resource manager from the job manager because of the given cause.

http://git-wip-us.apache.org/repos/asf/flink/blob/97ccc147/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
index 1933554..25c4aba 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
@@ -28,6 +28,7 @@ import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
 import org.apache.flink.runtime.jobgraph.JobGraph;
@@ -82,6 +83,9 @@ public class MiniCluster {
 	private HighAvailabilityServices haServices;
 
 	@GuardedBy("lock")
+	private HeartbeatServices heartbeatServices;
+
+	@GuardedBy("lock")
 	private ResourceManagerRunner[] resourceManagerRunners;
 
 	@GuardedBy("lock")
@@ -232,6 +236,8 @@ public class MiniCluster {
 				LOG.info("Starting high-availability services");
 				haServices = HighAvailabilityServicesUtils.createAvailableOrEmbeddedServices(configuration);
 
+				heartbeatServices = HeartbeatServices.fromConfiguration(configuration);
+
 				// bring up the ResourceManager(s)
 				LOG.info("Starting {} ResourceManger(s)", numResourceManagers);
 				resourceManagerRunners = startResourceManagers(
@@ -245,7 +251,12 @@ public class MiniCluster {
 				// bring up the dispatcher that launches JobManagers when jobs submitted
 				LOG.info("Starting job dispatcher(s) for {} JobManger(s)", numJobManagers);
 				jobDispatcher = new MiniClusterJobDispatcher(
-						configuration, haServices, metricRegistry, numJobManagers, jobManagerRpcServices);
+					configuration,
+					haServices,
+					heartbeatServices,
+					metricRegistry,
+					numJobManagers,
+					jobManagerRpcServices);
 			}
 			catch (Exception e) {
 				// cleanup everything
@@ -533,6 +544,7 @@ public class MiniCluster {
 				new ResourceID(UUID.randomUUID().toString()),
 				taskManagerRpcServices[i],
 				haServices,
+				heartbeatServices,
 				metricRegistry,
 				localCommunication);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/97ccc147/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java
index dd80ada..1f8ae80 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java
@@ -22,6 +22,8 @@ import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobmanager.OnCompletionActions;
@@ -63,6 +65,9 @@ public class MiniClusterJobDispatcher {
 	/** services for discovery, leader election, and recovery */
 	private final HighAvailabilityServices haServices;
 
+	/** services for heartbeating */
+	private final HeartbeatServices heartbeatServices;
+
 	/** all the services that the JobManager needs, such as BLOB service, factories, etc */
 	private final JobManagerServices jobManagerServices;
 
@@ -94,8 +99,9 @@ public class MiniClusterJobDispatcher {
 			Configuration config,
 			RpcService rpcService,
 			HighAvailabilityServices haServices,
+			HeartbeatServices heartbeatServices,
 			MetricRegistry metricRegistry) throws Exception {
-		this(config, haServices, metricRegistry, 1, new RpcService[] { rpcService });
+		this(config, haServices, heartbeatServices, metricRegistry, 1, new RpcService[] { rpcService });
 	}
 
 	/**
@@ -113,6 +119,7 @@ public class MiniClusterJobDispatcher {
 	public MiniClusterJobDispatcher(
 			Configuration config,
 			HighAvailabilityServices haServices,
+			HeartbeatServices heartbeatServices,
 			MetricRegistry metricRegistry,
 			int numJobManagers,
 			RpcService[] rpcServices) throws Exception {
@@ -123,6 +130,7 @@ public class MiniClusterJobDispatcher {
 		this.configuration = checkNotNull(config);
 		this.rpcServices = rpcServices;
 		this.haServices = checkNotNull(haServices);
+		this.heartbeatServices = checkNotNull(heartbeatServices);
 		this.metricRegistry = checkNotNull(metricRegistry);
 		this.numJobManagers = numJobManagers;
 
@@ -232,9 +240,17 @@ public class MiniClusterJobDispatcher {
 		JobManagerRunner[] runners = new JobManagerRunner[numJobManagers];
 		for (int i = 0; i < numJobManagers; i++) {
 			try {
-				runners[i] = new JobManagerRunner(job, configuration,
-						rpcServices[i], haServices, jobManagerServices, metricRegistry, 
-						onCompletion, errorHandler);
+				runners[i] = new JobManagerRunner(
+					ResourceID.generate(),
+					job,
+					configuration,
+					rpcServices[i],
+					haServices,
+					heartbeatServices,
+					jobManagerServices,
+					metricRegistry,
+					onCompletion,
+					errorHandler);
 				runners[i].start();
 			}
 			catch (Throwable t) {

http://git-wip-us.apache.org/repos/asf/flink/blob/97ccc147/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index e601b0b..00a1bf8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -30,6 +30,7 @@ import org.apache.flink.runtime.clusterframework.types.SlotID;
 import org.apache.flink.runtime.concurrent.AcceptFunction;
 import org.apache.flink.runtime.concurrent.ApplyFunction;
 import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
 import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
@@ -39,7 +40,8 @@ import org.apache.flink.runtime.executiongraph.PartitionInfo;
 import org.apache.flink.runtime.executiongraph.TaskInformation;
 import org.apache.flink.runtime.filecache.FileCache;
 import org.apache.flink.runtime.heartbeat.HeartbeatListener;
-import org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl;
+import org.apache.flink.runtime.heartbeat.HeartbeatManager;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.heartbeat.HeartbeatTarget;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
@@ -131,7 +133,7 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
 	private final MetricRegistry metricRegistry;
 
 	/** The heartbeat manager for job manager in the task manager */
-	private final HeartbeatManagerImpl<Void, Void> jobManagerHeartbeatManager;
+	private final HeartbeatManager<Void, Void> jobManagerHeartbeatManager;
 
 	/** The fatal error handler to use in case of a fatal error */
 	private final FatalErrorHandler fatalErrorHandler;
@@ -168,8 +170,8 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
 		IOManager ioManager,
 		NetworkEnvironment networkEnvironment,
 		HighAvailabilityServices haServices,
+		HeartbeatServices heartbeatServices,
 		MetricRegistry metricRegistry,
-		HeartbeatManagerImpl<Void, Void> jobManagerHeartbeatManager,
 		TaskManagerMetricGroup taskManagerMetricGroup,
 		BroadcastVariableManager broadcastVariableManager,
 		FileCache fileCache,
@@ -189,7 +191,6 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
 		this.networkEnvironment = checkNotNull(networkEnvironment);
 		this.haServices = checkNotNull(haServices);
 		this.metricRegistry = checkNotNull(metricRegistry);
-		this.jobManagerHeartbeatManager = checkNotNull(jobManagerHeartbeatManager);
 		this.taskSlotTable = checkNotNull(taskSlotTable);
 		this.fatalErrorHandler = checkNotNull(fatalErrorHandler);
 		this.taskManagerMetricGroup = checkNotNull(taskManagerMetricGroup);
@@ -199,6 +200,12 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
 		this.jobLeaderService = checkNotNull(jobLeaderService);
 
 		this.jobManagerConnections = new HashMap<>(4);
+
+		this.jobManagerHeartbeatManager = heartbeatServices.createHeartbeatManager(
+			getResourceID(),
+			new JobManagerHeartbeatListener(),
+			rpcService.getScheduledExecutor(),
+			log);
 	}
 
 	// ------------------------------------------------------------------------
@@ -221,38 +228,6 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
 
 		// start the job leader service
 		jobLeaderService.start(getAddress(), getRpcService(), haServices, new JobLeaderListenerImpl());
-
-		// start the heartbeat manager for monitoring job manager
-		jobManagerHeartbeatManager.start(new HeartbeatListener<Void, Void>() {
-			@Override
-			public void notifyHeartbeatTimeout(final ResourceID resourceID) {
-				runAsync(new Runnable() {
-					@Override
-					public void run() {
-						log.info("Notify heartbeat timeout with job manager {}", resourceID);
-						jobManagerHeartbeatManager.unmonitorTarget(resourceID);
-
-						if (jobManagerConnections.containsKey(resourceID)) {
-							JobManagerConnection jobManagerConnection = jobManagerConnections.get(resourceID);
-							if (jobManagerConnection != null) {
-								closeJobManagerConnection(jobManagerConnection.getJobID());
-							}
-						}
-					}
-				});
-			}
-
-			@Override
-			public void reportPayload(ResourceID resourceID, Void payload) {
-				// currently there is no payload from job manager, so this method will not be called.
-			}
-
-			@Override
-			public Future<Void> retrievePayload() {
-				// currently no need payload.
-				return null;
-			}
-		});
 	}
 
 	/**
@@ -644,6 +619,11 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
 		return new TMSlotRequestRegistered(resourceManagerConnection.getRegistrationId(), getResourceID(), allocationId);
 	}
 
+	@RpcMethod
+	public void disconnectJobManager(JobID jobId, Exception cause) {
+		closeJobManagerConnection(jobId, cause);
+	}
+
 	// ======================================================================
 	//  Internal methods
 	// ======================================================================
@@ -786,7 +766,7 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
 		if (jobManagerTable.contains(jobId)) {
 			JobManagerConnection oldJobManagerConnection = jobManagerTable.get(jobId);
 			if (!oldJobManagerConnection.getLeaderId().equals(jobManagerLeaderId)) {
-				closeJobManagerConnection(jobId);
+				closeJobManagerConnection(jobId, new Exception("Found new job leader for job id " + jobId + '.'));
 			}
 		}
 
@@ -803,20 +783,20 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
 		// monitor the job manager as heartbeat target
 		jobManagerHeartbeatManager.monitorTarget(jobManagerResourceID, new HeartbeatTarget<Void>() {
 			@Override
-			public void sendHeartbeat(ResourceID resourceID, Void payload) {
+			public void receiveHeartbeat(ResourceID resourceID, Void payload) {
 				jobMasterGateway.heartbeatFromTaskManager(resourceID);
 			}
 
 			@Override
 			public void requestHeartbeat(ResourceID resourceID, Void payload) {
-				// request heartbeat will never be called in task manager side
+				// request heartbeat will never be called on the task manager side
 			}
 		});
 
 		offerSlotsToJobManager(jobId);
 	}
 
-	private void closeJobManagerConnection(JobID jobId) {
+	private void closeJobManagerConnection(JobID jobId, Exception cause) {
 		log.info("Close JobManager connection for job {}.", jobId);
 
 		// 1. fail tasks running under this JobID
@@ -847,8 +827,10 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
 
 		if (jobManagerConnection != null) {
 			try {
+				jobManagerHeartbeatManager.unmonitorTarget(jobManagerConnection.getResourceID());
+
 				jobManagerConnections.remove(jobManagerConnection.getResourceID());
-				disassociateFromJobManager(jobManagerConnection);
+				disassociateFromJobManager(jobManagerConnection, cause);
 			} catch (IOException e) {
 				log.warn("Could not properly disassociate from JobManager {}.",
 					jobManagerConnection.getJobManagerGateway().getAddress(), e);
@@ -909,10 +891,10 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
 			partitionStateChecker);
 	}
 
-	private void disassociateFromJobManager(JobManagerConnection jobManagerConnection) throws IOException {
+	private void disassociateFromJobManager(JobManagerConnection jobManagerConnection, Exception cause) throws IOException {
 		Preconditions.checkNotNull(jobManagerConnection);
 		JobMasterGateway jobManagerGateway = jobManagerConnection.getJobManagerGateway();
-		jobManagerGateway.disconnectTaskManager(getResourceID());
+		jobManagerGateway.disconnectTaskManager(getResourceID(), cause);
 		jobManagerConnection.getLibraryCacheManager().shutdown();
 	}
 
@@ -1138,7 +1120,9 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
 			runAsync(new Runnable() {
 				@Override
 				public void run() {
-					closeJobManagerConnection(jobId);
+					closeJobManagerConnection(
+						jobId,
+						new Exception("Job leader for job id " + jobId + " lost leadership."));
 				}
 			});
 		}
@@ -1220,4 +1204,37 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
 			});
 		}
 	}
+
+	private class JobManagerHeartbeatListener implements HeartbeatListener<Void, Void> {
+
+		@Override
+		public void notifyHeartbeatTimeout(final ResourceID resourceID) {
+			runAsync(new Runnable() {
+				@Override
+				public void run() {
+					log.info("The JobManager connection {} has timed out.", resourceID);
+
+					if (jobManagerConnections.containsKey(resourceID)) {
+						JobManagerConnection jobManagerConnection = jobManagerConnections.get(resourceID);
+						if (jobManagerConnection != null) {
+							closeJobManagerConnection(
+								jobManagerConnection.getJobID(),
+								new TimeoutException("The heartbeat of JobManager with id " +
+									resourceID + " timed out."));
+						}
+					}
+				}
+			});
+		}
+
+		@Override
+		public void reportPayload(ResourceID resourceID, Void payload) {
+			// nothing to do since the payload is of type Void
+		}
+
+		@Override
+		public Future<Void> retrievePayload() {
+			return FlinkCompletableFuture.completed(null);
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/97ccc147/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
index 95db932..2dcc3a4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
@@ -134,9 +134,17 @@ public interface TaskExecutorGateway extends RpcGateway {
 	Future<Acknowledge> cancelTask(ExecutionAttemptID executionAttemptID, @RpcTimeout Time timeout);
 
 	/**
-	 * Request heartbeat from the job manager
+	 * Heartbeat request from the job manager
 	 *
-	 * @param resourceID unique id of the job manager
+	 * @param heartbeatOrigin unique id of the job manager
 	 */
-	void heartbeatFromJobManager(ResourceID resourceID);
+	void heartbeatFromJobManager(ResourceID heartbeatOrigin);
+
+	/**
+	 * Disconnects the given JobManager from the TaskManager.
+	 *
+	 * @param jobId JobID for which the JobManager was the leader
+	 * @param cause for the disconnection from the JobManager
+	 */
+	void disconnectJobManager(JobID jobId, Exception cause);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/97ccc147/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
index 402421c..c99eb91 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
@@ -21,11 +21,10 @@ package org.apache.flink.runtime.taskexecutor;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.HeartbeatManagerOptions;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.concurrent.Future;
-import org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
@@ -73,18 +72,27 @@ public class TaskManagerRunner implements FatalErrorHandler {
 			ResourceID resourceID,
 			RpcService rpcService,
 			HighAvailabilityServices highAvailabilityServices,
+			HeartbeatServices heartbeatServices,
 			MetricRegistry metricRegistry) throws Exception {
 
-		this(configuration, resourceID, rpcService, highAvailabilityServices, metricRegistry, false);
+		this(
+			configuration,
+			resourceID,
+			rpcService,
+			highAvailabilityServices,
+			heartbeatServices,
+			metricRegistry,
+			false);
 	}
 
 	public TaskManagerRunner(
-		Configuration configuration,
-		ResourceID resourceID,
-		RpcService rpcService,
-		HighAvailabilityServices highAvailabilityServices,
-		MetricRegistry metricRegistry,
-		boolean localCommunicationOnly) throws Exception {
+			Configuration configuration,
+			ResourceID resourceID,
+			RpcService rpcService,
+			HighAvailabilityServices highAvailabilityServices,
+			HeartbeatServices heartbeatServices,
+			MetricRegistry metricRegistry,
+			boolean localCommunicationOnly) throws Exception {
 
 		this.configuration = Preconditions.checkNotNull(configuration);
 		this.resourceID = Preconditions.checkNotNull(resourceID);
@@ -114,13 +122,6 @@ public class TaskManagerRunner implements FatalErrorHandler {
 		// Initialize the TM metrics
 		TaskExecutorMetricsInitializer.instantiateStatusMetrics(taskManagerMetricGroup, taskManagerServices.getNetworkEnvironment());
 
-		HeartbeatManagerImpl<Void, Void> heartbeatManager = new HeartbeatManagerImpl<>(
-				configuration.getLong(HeartbeatManagerOptions.HEARTBEAT_TIMEOUT),
-				resourceID,
-				executor,
-				rpcService.getScheduledExecutor(),
-				LOG);
-
 		this.taskManager = new TaskExecutor(
 			taskManagerConfiguration,
 			taskManagerServices.getTaskManagerLocation(),
@@ -129,8 +130,8 @@ public class TaskManagerRunner implements FatalErrorHandler {
 			taskManagerServices.getIOManager(),
 			taskManagerServices.getNetworkEnvironment(),
 			highAvailabilityServices,
+			heartbeatServices,
 			metricRegistry,
-			heartbeatManager,
 			taskManagerMetricGroup,
 			taskManagerServices.getBroadcastVariableManager(),
 			taskManagerServices.getFileCache(),

http://git-wip-us.apache.org/repos/asf/flink/blob/97ccc147/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerMockTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerMockTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerMockTest.java
index 1a9818e..d2221c5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerMockTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerMockTest.java
@@ -22,6 +22,8 @@ import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.blob.BlobStore;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.highavailability.RunningJobsRegistry;
 import org.apache.flink.runtime.jobgraph.JobGraph;
@@ -99,11 +101,15 @@ public class JobManagerRunnerMockTest {
 		when(haServices.createBlobStore()).thenReturn(blobStore);
 		when(haServices.getRunningJobsRegistry()).thenReturn(runningJobsRegistry);
 
+		HeartbeatServices heartbeatServices = mock(HeartbeatServices.class);
+
 		runner = PowerMockito.spy(new JobManagerRunner(
+			ResourceID.generate(),
 			new JobGraph("test", new JobVertex("vertex")),
 			mock(Configuration.class),
 			mockRpc,
 			haServices,
+			heartbeatServices,
 			JobManagerServices.fromConfiguration(new Configuration(), haServices),
 			new MetricRegistry(MetricRegistryConfiguration.defaultMetricRegistryConfiguration()),
 			jobCompletion,

http://git-wip-us.apache.org/repos/asf/flink/blob/97ccc147/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
index cdad87f..567a8fc 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
@@ -19,17 +19,18 @@
 package org.apache.flink.runtime.jobmaster;
 
 import org.apache.flink.api.common.time.Time;
-import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.concurrent.ScheduledExecutor;
 import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
 import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoader;
 import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory;
+import org.apache.flink.runtime.heartbeat.HeartbeatListener;
+import org.apache.flink.runtime.heartbeat.HeartbeatManager;
 import org.apache.flink.runtime.heartbeat.HeartbeatManagerSenderImpl;
-import org.apache.flink.runtime.heartbeat.TestingHeartbeatManagerSenderImpl;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
-import org.apache.flink.runtime.instance.SlotPoolGateway;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobmanager.OnCompletionActions;
 import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
@@ -37,24 +38,22 @@ import org.apache.flink.runtime.rpc.TestingSerialRpcService;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.util.TestingFatalErrorHandler;
+import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.TestLogger;
 import org.junit.Test;
 import org.junit.runner.RunWith;
+import org.mockito.ArgumentCaptor;
 import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
-import org.powermock.reflect.Whitebox;
+import org.slf4j.Logger;
 
 import java.net.InetAddress;
 import java.net.URL;
-import java.util.Map;
 import java.util.UUID;
-import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeoutException;
 
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.*;
 
@@ -84,32 +83,28 @@ public class JobMasterTest extends TestLogger {
 
 		final long heartbeatInterval = 1L;
 		final long heartbeatTimeout = 5L;
-		final CountDownLatch waitLatch = new CountDownLatch(1);
-		final HeartbeatManagerSenderImpl<Void, Void> jmHeartbeatManager = new TestingHeartbeatManagerSenderImpl<>(
-				waitLatch,
-				heartbeatInterval,
-				heartbeatTimeout,
-				jmResourceId,
-				rpc.getExecutor(),
-				rpc.getScheduledExecutor(),
-				log);
+
+		final ScheduledExecutor scheduledExecutor = mock(ScheduledExecutor.class);
+		final HeartbeatServices heartbeatServices = new TestingHeartbeatServices(heartbeatInterval, heartbeatTimeout, scheduledExecutor);
+
+		final JobGraph jobGraph = new JobGraph();
 
 		try {
 			final JobMaster jobMaster = new JobMaster(
-					new JobGraph(),
-					new Configuration(),
-					rpc,
-					haServices,
-					Executors.newScheduledThreadPool(1),
-					mock(BlobLibraryCacheManager.class),
-					mock(RestartStrategyFactory.class),
-					Time.of(10, TimeUnit.SECONDS),
-					null,
-					jmResourceId,
-					jmHeartbeatManager,
-					mock(OnCompletionActions.class),
-					testingFatalErrorHandler,
-					new FlinkUserCodeClassLoader(new URL[0]));
+				jmResourceId,
+				jobGraph,
+				new Configuration(),
+				rpc,
+				haServices,
+				heartbeatServices,
+				Executors.newScheduledThreadPool(1),
+				mock(BlobLibraryCacheManager.class),
+				mock(RestartStrategyFactory.class),
+				Time.of(10, TimeUnit.SECONDS),
+				null,
+				mock(OnCompletionActions.class),
+				testingFatalErrorHandler,
+				new FlinkUserCodeClassLoader(new URL[0]));
 
 			// also start the heartbeat manager in job manager
 			jobMaster.start(jmLeaderId);
@@ -117,24 +112,29 @@ public class JobMasterTest extends TestLogger {
 			// register task manager will trigger monitoring heartbeat target, schedule heartbeat request in interval time
 			jobMaster.registerTaskManager(taskManagerAddress, taskManagerLocation, jmLeaderId);
 
-			verify(taskExecutorGateway, atLeast(1)).heartbeatFromJobManager(eq(jmResourceId));
+			ArgumentCaptor<Runnable> heartbeatRunnableCaptor = ArgumentCaptor.forClass(Runnable.class);
+			verify(scheduledExecutor, times(1)).scheduleAtFixedRate(
+				heartbeatRunnableCaptor.capture(),
+				eq(0L),
+				eq(heartbeatInterval),
+				eq(TimeUnit.MILLISECONDS));
+
+			Runnable heartbeatRunnable = heartbeatRunnableCaptor.getValue();
+
+			ArgumentCaptor<Runnable> timeoutRunnableCaptor = ArgumentCaptor.forClass(Runnable.class);
+			verify(scheduledExecutor).schedule(timeoutRunnableCaptor.capture(), eq(heartbeatTimeout), eq(TimeUnit.MILLISECONDS));
 
-			final ConcurrentHashMap<ResourceID, Object> heartbeatTargets = Whitebox.getInternalState(jmHeartbeatManager, "heartbeatTargets");
-			final Map<ResourceID, Tuple2<TaskManagerLocation, TaskExecutorGateway>> registeredTMsInJM = Whitebox.getInternalState(jobMaster, "registeredTaskManagers");
-			final SlotPoolGateway slotPoolGateway = mock(SlotPoolGateway.class);
-			Whitebox.setInternalState(jobMaster, "slotPoolGateway", slotPoolGateway);
+			Runnable timeoutRunnable = timeoutRunnableCaptor.getValue();
 
-			// before heartbeat timeout
-			assertTrue(heartbeatTargets.containsKey(tmResourceId));
-			assertTrue(registeredTMsInJM.containsKey(tmResourceId));
+			// run the first heartbeat request
+			heartbeatRunnable.run();
 
-			// continue to unmonitor heartbeat target
-			waitLatch.countDown();
+			verify(taskExecutorGateway, times(1)).heartbeatFromJobManager(eq(jmResourceId));
 
-			// after heartbeat timeout
-			verify(slotPoolGateway, timeout(heartbeatTimeout * 5)).releaseTaskManager(eq(tmResourceId));
-			assertFalse(heartbeatTargets.containsKey(tmResourceId));
-			assertFalse(registeredTMsInJM.containsKey(tmResourceId));
+			// run the timeout runnable to simulate a heartbeat timeout
+			timeoutRunnable.run();
+
+			verify(taskExecutorGateway).disconnectJobManager(eq(jobGraph.getJobID()), any(TimeoutException.class));
 
 			// check if a concurrent error occurred
 			testingFatalErrorHandler.rethrowError();
@@ -143,4 +143,32 @@ public class JobMasterTest extends TestLogger {
 			rpc.stopService();
 		}
 	}
+
+	private static class TestingHeartbeatServices extends HeartbeatServices {
+
+		private final ScheduledExecutor scheduledExecutorToUse;
+
+		public TestingHeartbeatServices(long heartbeatInterval, long heartbeatTimeout, ScheduledExecutor scheduledExecutorToUse) {
+			super(heartbeatInterval, heartbeatTimeout);
+
+			this.scheduledExecutorToUse = Preconditions.checkNotNull(scheduledExecutorToUse);
+		}
+
+		@Override
+		public <I, O> HeartbeatManager<I, O> createHeartbeatManagerSender(
+			ResourceID resourceId,
+			HeartbeatListener<I, O> heartbeatListener,
+			ScheduledExecutor scheduledExecutor,
+			Logger log) {
+
+			return new HeartbeatManagerSenderImpl<>(
+				heartbeatInterval,
+				heartbeatTimeout,
+				resourceId,
+				heartbeatListener,
+				org.apache.flink.runtime.concurrent.Executors.directExecutor(),
+				scheduledExecutorToUse,
+				log);
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/97ccc147/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
index 5ffc97e..16edbf7 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
@@ -28,7 +28,7 @@ import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
 import org.apache.flink.runtime.filecache.FileCache;
-import org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.network.NetworkEnvironment;
@@ -67,6 +67,7 @@ import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.argThat;
 import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.RETURNS_MOCKS;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
@@ -104,7 +105,7 @@ public class TaskExecutorITCase {
 			rpcService.getScheduledExecutor(),
 			resourceManagerConfiguration.getJobTimeout());
 		MetricRegistry metricRegistry = mock(MetricRegistry.class);
-		HeartbeatManagerImpl heartbeatManager = mock(HeartbeatManagerImpl.class);
+		HeartbeatServices heartbeatServices = mock(HeartbeatServices.class, RETURNS_MOCKS);
 
 		final TaskManagerConfiguration taskManagerConfiguration = TaskManagerConfiguration.fromConfiguration(configuration);
 		final TaskManagerLocation taskManagerLocation = new TaskManagerLocation(taskManagerResourceId, InetAddress.getLocalHost(), 1234);
@@ -135,8 +136,8 @@ public class TaskExecutorITCase {
 			ioManager,
 			networkEnvironment,
 			testingHAServices,
+			heartbeatServices,
 			metricRegistry,
-			heartbeatManager,
 			taskManagerMetricGroup,
 			broadcastVariableManager,
 			fileCache,

http://git-wip-us.apache.org/repos/asf/flink/blob/97ccc147/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
index f500246..0f5bad3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
@@ -30,6 +30,7 @@ import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.clusterframework.types.SlotID;
 import org.apache.flink.runtime.concurrent.CompletableFuture;
 import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.concurrent.ScheduledExecutor;
 import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
 import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
@@ -39,8 +40,9 @@ import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.executiongraph.JobInformation;
 import org.apache.flink.runtime.executiongraph.TaskInformation;
 import org.apache.flink.runtime.filecache.FileCache;
+import org.apache.flink.runtime.heartbeat.HeartbeatListener;
 import org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl;
-import org.apache.flink.runtime.heartbeat.TestingHeartbeatManagerImpl;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.highavailability.NonHaServices;
 import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
@@ -82,16 +84,17 @@ import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TestName;
 import org.mockito.Matchers;
-import org.powermock.reflect.Whitebox;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.slf4j.Logger;
 
 import java.net.InetAddress;
 import java.net.URL;
 import java.util.Arrays;
 import java.util.Collections;
-import java.util.Map;
 import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executor;
+import java.util.concurrent.TimeoutException;
 
 import static org.hamcrest.Matchers.contains;
 import static org.junit.Assert.assertFalse;
@@ -124,15 +127,27 @@ public class TaskExecutorTest extends TestLogger {
 
 		final TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler();
 
-		final CountDownLatch waitLatch =  new CountDownLatch(1);
 		final long heartbeatTimeout = 10L;
-		final HeartbeatManagerImpl<Void, Void> tmHeartbeatManager = new TestingHeartbeatManagerImpl<>(
-				waitLatch,
-				heartbeatTimeout,
-				tmResourceId,
-				rpc.getExecutor(),
-				rpc.getScheduledExecutor(),
-				log);
+
+		HeartbeatServices heartbeatServices = mock(HeartbeatServices.class);
+		when(heartbeatServices.createHeartbeatManager(
+			eq(taskManagerLocation.getResourceID()),
+			any(HeartbeatListener.class),
+			any(ScheduledExecutor.class),
+			any(Logger.class))).thenAnswer(
+			new Answer<HeartbeatManagerImpl<Void, Void>>() {
+				@Override
+				public HeartbeatManagerImpl<Void, Void> answer(InvocationOnMock invocation) throws Throwable {
+					return new HeartbeatManagerImpl<>(
+						heartbeatTimeout,
+						taskManagerLocation.getResourceID(),
+						(HeartbeatListener<Void, Void>)invocation.getArguments()[1],
+						(Executor)invocation.getArguments()[2],
+						(ScheduledExecutor)invocation.getArguments()[2],
+						(Logger)invocation.getArguments()[3]);
+				}
+			}
+		);
 
 		final String jobMasterAddress = "jm";
 		final UUID jmLeaderId = UUID.randomUUID();
@@ -147,25 +162,26 @@ public class TaskExecutorTest extends TestLogger {
 				any(Time.class)
 		)).thenReturn(FlinkCompletableFuture.<RegistrationResponse>completed(new JMTMRegistrationSuccess(jmResourceId, blobPort)));
 		when(jobMasterGateway.getAddress()).thenReturn(jobMasterAddress);
+		when(jobMasterGateway.getHostname()).thenReturn("localhost");
 
 		try {
 			final TaskExecutor taskManager = new TaskExecutor(
-					tmConfig,
-					taskManagerLocation,
-					rpc,
-					mock(MemoryManager.class),
-					mock(IOManager.class),
-					mock(NetworkEnvironment.class),
-					haServices,
-					mock(MetricRegistry.class),
-					tmHeartbeatManager,
-					mock(TaskManagerMetricGroup.class),
-					mock(BroadcastVariableManager.class),
-					mock(FileCache.class),
-					taskSlotTable,
-					new JobManagerTable(),
-					jobLeaderService,
-					testingFatalErrorHandler);
+				tmConfig,
+				taskManagerLocation,
+				rpc,
+				mock(MemoryManager.class),
+				mock(IOManager.class),
+				mock(NetworkEnvironment.class),
+				haServices,
+				heartbeatServices,
+				mock(MetricRegistry.class),
+				mock(TaskManagerMetricGroup.class),
+				mock(BroadcastVariableManager.class),
+				mock(FileCache.class),
+				taskSlotTable,
+				new JobManagerTable(),
+				jobLeaderService,
+				testingFatalErrorHandler);
 
 			taskManager.start();
 
@@ -182,23 +198,8 @@ public class TaskExecutorTest extends TestLogger {
 			verify(jobMasterGateway).registerTaskManager(
 					eq(taskManager.getAddress()), eq(taskManagerLocation), eq(jmLeaderId), any(Time.class));
 
-			final ConcurrentHashMap<ResourceID, Object> heartbeatTargets = Whitebox.getInternalState(tmHeartbeatManager, "heartbeatTargets");
-			final JobManagerTable jobManagerTable = Whitebox.getInternalState(taskManager, "jobManagerTable");
-			final Map<ResourceID, JobManagerConnection> jobManagerConnections = Whitebox.getInternalState(taskManager, "jobManagerConnections");
-
-			// before heartbeat timeout
-			assertTrue(heartbeatTargets.containsKey(jmResourceId));
-			assertTrue(jobManagerTable.contains(jobId));
-			assertTrue(jobManagerConnections.containsKey(jmResourceId));
-
-			// continue to unmonitor heartbeat target
-			waitLatch.countDown();
-
-			// after heartbeat timeout
-			verify(jobMasterGateway, timeout(heartbeatTimeout)).disconnectTaskManager(eq(tmResourceId));
-			assertFalse(heartbeatTargets.containsKey(jmResourceId));
-			assertFalse(jobManagerTable.contains(jobId));
-			assertFalse(jobManagerConnections.containsKey(jmResourceId));
+			// the timeout should trigger disconnecting from the JobManager
+			verify(jobMasterGateway, timeout(heartbeatTimeout * 5)).disconnectTaskManager(eq(taskManagerLocation.getResourceID()), any(TimeoutException.class));
 
 			// check if a concurrent error occurred
 			testingFatalErrorHandler.rethrowError();
@@ -247,8 +248,8 @@ public class TaskExecutorTest extends TestLogger {
 				mock(IOManager.class),
 				mock(NetworkEnvironment.class),
 				haServices,
+				mock(HeartbeatServices.class, RETURNS_MOCKS),
 				mock(MetricRegistry.class),
-				mock(HeartbeatManagerImpl.class),
 				mock(TaskManagerMetricGroup.class),
 				mock(BroadcastVariableManager.class),
 				mock(FileCache.class),
@@ -324,8 +325,8 @@ public class TaskExecutorTest extends TestLogger {
 				mock(IOManager.class),
 				mock(NetworkEnvironment.class),
 				haServices,
+				mock(HeartbeatServices.class, RETURNS_MOCKS),
 				mock(MetricRegistry.class),
-				mock(HeartbeatManagerImpl.class),
 				mock(TaskManagerMetricGroup.class),
 				mock(BroadcastVariableManager.class),
 				mock(FileCache.class),
@@ -456,8 +457,8 @@ public class TaskExecutorTest extends TestLogger {
 				mock(IOManager.class),
 				networkEnvironment,
 				haServices,
+				mock(HeartbeatServices.class, RETURNS_MOCKS),
 				mock(MetricRegistry.class),
-				mock(HeartbeatManagerImpl.class),
 				taskManagerMetricGroup,
 				mock(BroadcastVariableManager.class),
 				mock(FileCache.class),
@@ -563,8 +564,8 @@ public class TaskExecutorTest extends TestLogger {
 				mock(IOManager.class),
 				mock(NetworkEnvironment.class),
 				haServices,
+				mock(HeartbeatServices.class, RETURNS_MOCKS),
 				mock(MetricRegistry.class),
-				mock(HeartbeatManagerImpl.class),
 				mock(TaskManagerMetricGroup.class),
 				mock(BroadcastVariableManager.class),
 				mock(FileCache.class),
@@ -602,7 +603,7 @@ public class TaskExecutorTest extends TestLogger {
 	}
 
 	/**
-	 * Tests that accepted slots go into state assigned and the others are returned to the  resource
+	 * Tests that accepted slots go into state assigned and the others are returned to the resource
 	 * manager.
 	 */
 	@Test
@@ -649,7 +650,6 @@ public class TaskExecutorTest extends TestLogger {
 		final AllocationID allocationId2 = new AllocationID();
 
 		final SlotOffer offer1 = new SlotOffer(allocationId1, 0, ResourceProfile.UNKNOWN);
-		final SlotOffer offer2 = new SlotOffer(allocationId2, 0, ResourceProfile.UNKNOWN);
 
 		final JobMasterGateway jobMasterGateway = mock(JobMasterGateway.class);
 
@@ -677,8 +677,8 @@ public class TaskExecutorTest extends TestLogger {
 				mock(IOManager.class),
 				mock(NetworkEnvironment.class),
 				haServices,
+				mock(HeartbeatServices.class, RETURNS_MOCKS),
 				mock(MetricRegistry.class),
-				mock(HeartbeatManagerImpl.class),
 				mock(TaskManagerMetricGroup.class),
 				mock(BroadcastVariableManager.class),
 				mock(FileCache.class),
@@ -752,8 +752,8 @@ public class TaskExecutorTest extends TestLogger {
 				mock(IOManager.class),
 				mock(NetworkEnvironment.class),
 				haServices,
+				mock(HeartbeatServices.class, RETURNS_MOCKS),
 				mock(MetricRegistry.class),
-				mock(HeartbeatManagerImpl.class),
 				mock(TaskManagerMetricGroup.class),
 				mock(BroadcastVariableManager.class),
 				mock(FileCache.class),
@@ -895,8 +895,8 @@ public class TaskExecutorTest extends TestLogger {
 				mock(IOManager.class),
 				mock(NetworkEnvironment.class),
 				haServices,
+				mock(HeartbeatServices.class, RETURNS_MOCKS),
 				mock(MetricRegistry.class),
-				mock(HeartbeatManagerImpl.class),
 				mock(TaskManagerMetricGroup.class),
 				mock(BroadcastVariableManager.class),
 				mock(FileCache.class),

http://git-wip-us.apache.org/repos/asf/flink/blob/97ccc147/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java
index ddeb02e..6fb7c86 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java
@@ -26,6 +26,8 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.clusterframework.ApplicationStatus;
 import org.apache.flink.runtime.clusterframework.BootstrapTools;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
 import org.apache.flink.runtime.jobgraph.JobGraph;
@@ -90,6 +92,9 @@ public class YarnFlinkApplicationMasterRunner extends AbstractYarnFlinkApplicati
 	private HighAvailabilityServices haServices;
 
 	@GuardedBy("lock")
+	private HeartbeatServices heartbeatServices;
+
+	@GuardedBy("lock")
 	private RpcService commonRpcService;
 
 	@GuardedBy("lock")
@@ -135,6 +140,8 @@ public class YarnFlinkApplicationMasterRunner extends AbstractYarnFlinkApplicati
 			synchronized (lock) {
 				LOG.info("Starting High Availability Services");
 				haServices = HighAvailabilityServicesUtils.createAvailableOrEmbeddedServices(config);
+
+				heartbeatServices = HeartbeatServices.fromConfiguration(config);
 				
 				metricRegistry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(config));
 				commonRpcService = createRpcService(config, appMasterHostname, amPortRange);
@@ -210,11 +217,14 @@ public class YarnFlinkApplicationMasterRunner extends AbstractYarnFlinkApplicati
 
 		// now the JobManagerRunner
 		return new JobManagerRunner(
-				jobGraph, config,
-				commonRpcService,
-				haServices,
-				this,
-				this);
+			ResourceID.generate(),
+			jobGraph,
+			config,
+			commonRpcService,
+			haServices,
+			heartbeatServices,
+			this,
+			this);
 	}
 
 	protected void shutdown(ApplicationStatus status, String msg) {

http://git-wip-us.apache.org/repos/asf/flink/blob/97ccc147/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunner.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunner.java
index 414c3de..2369765 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunner.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunner.java
@@ -24,6 +24,7 @@ import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.configuration.SecurityOptions;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
 import org.apache.flink.runtime.metrics.MetricRegistry;
@@ -208,11 +209,19 @@ public class YarnTaskExecutorRunner {
 			LOG.info("YARN assigned resource id {} for the task executor.", resourceID.toString());
 
 			haServices = HighAvailabilityServicesUtils.createAvailableOrEmbeddedServices(config);
+			HeartbeatServices heartbeatServices = HeartbeatServices.fromConfiguration(config);
+
 			metricRegistry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(config));
 
 			// ---- (2) init task manager runner -------
 			taskExecutorRpcService = TaskManagerRunner.createRpcService(config, haServices);
-			taskManagerRunner = new TaskManagerRunner(config, resourceID, taskExecutorRpcService, haServices, metricRegistry);
+			taskManagerRunner = new TaskManagerRunner(
+				config,
+				resourceID,
+				taskExecutorRpcService,
+				haServices,
+				heartbeatServices,
+				metricRegistry);
 
 			// ---- (3) start the task manager runner
 			taskManagerRunner.start();