You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/03/17 16:00:23 UTC

[1/2] flink git commit: [FLINK-4364] [heartbeats] Implement TaskManager side of heartbeat from JobManager

Repository: flink
Updated Branches:
  refs/heads/master 521a53d9a -> 97ccc1473


[FLINK-4364] [heartbeats] Implement TaskManager side of heartbeat from JobManager

This closes #3151.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0b3d5c27
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0b3d5c27
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0b3d5c27

Branch: refs/heads/master
Commit: 0b3d5c27f4ab7b2dffb37160a1f01cb822bb696e
Parents: 521a53d
Author: \u6dd8\u6c5f <ta...@alibaba-inc.com>
Authored: Wed Jan 18 19:08:19 2017 +0800
Committer: Till Rohrmann <tr...@apache.org>
Committed: Fri Mar 17 17:00:04 2017 +0100

----------------------------------------------------------------------
 .../configuration/HeartbeatManagerOptions.java  |  45 ++++++
 .../runtime/heartbeat/HeartbeatManagerImpl.java |  18 +--
 .../heartbeat/HeartbeatManagerSenderImpl.java   |  16 +-
 .../heartbeat/TestingHeartbeatManagerImpl.java  |  62 ++++++++
 .../TestingHeartbeatManagerSenderImpl.java      |  60 ++++++++
 .../runtime/jobmaster/JobManagerRunner.java     |  15 ++
 .../flink/runtime/jobmaster/JobMaster.java      |  62 +++++++-
 .../runtime/jobmaster/JobMasterGateway.java     |   7 +
 .../taskexecutor/JobManagerConnection.java      |  20 +++
 .../runtime/taskexecutor/TaskExecutor.java      |  92 +++++++++++-
 .../taskexecutor/TaskExecutorGateway.java       |   8 +
 .../runtime/taskexecutor/TaskManagerRunner.java |  10 ++
 .../runtime/heartbeat/HeartbeatManagerTest.java |  22 +--
 .../flink/runtime/jobmaster/JobMasterTest.java  | 146 +++++++++++++++++++
 .../taskexecutor/TaskExecutorITCase.java        |   3 +
 .../runtime/taskexecutor/TaskExecutorTest.java  | 127 +++++++++++++++-
 16 files changed, 669 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/0b3d5c27/flink-core/src/main/java/org/apache/flink/configuration/HeartbeatManagerOptions.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/HeartbeatManagerOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/HeartbeatManagerOptions.java
new file mode 100644
index 0000000..2258eb1
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/configuration/HeartbeatManagerOptions.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.configuration;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import static org.apache.flink.configuration.ConfigOptions.key;
+
+/**
+ * The set of configuration options relating to heartbeat manager settings.
+ */
+@PublicEvolving
+public class HeartbeatManagerOptions {
+
+	/** Time interval for requesting heartbeat from sender side */
+	public static final ConfigOption<Long> HEARTBEAT_INTERVAL =
+			key("heartbeat.sender.interval")
+			.defaultValue(10000L);
+
+	/** Timeout for requesting and receiving heartbeat for both sender and receiver sides */
+	public static final ConfigOption<Long> HEARTBEAT_TIMEOUT =
+			key("heartbeat.timeout")
+			.defaultValue(50000L);
+
+	// ------------------------------------------------------------------------
+
+	/** Not intended to be instantiated */
+	private HeartbeatManagerOptions() {}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0b3d5c27/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerImpl.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerImpl.java
index 42b1c85..9860b4d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerImpl.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerImpl.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.heartbeat;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.concurrent.AcceptFunction;
 import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.concurrent.ScheduledExecutor;
 import org.apache.flink.util.Preconditions;
 import org.slf4j.Logger;
 
@@ -28,7 +29,6 @@ import javax.annotation.concurrent.ThreadSafe;
 import java.util.Collection;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executor;
-import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
@@ -56,7 +56,7 @@ public class HeartbeatManagerImpl<I, O> implements HeartbeatManager<I, O> {
 	private final HeartbeatListener<I, O> heartbeatListener;
 
 	/** Executor service used to run heartbeat timeout notifications */
-	private final ScheduledExecutorService scheduledExecutorService;
+	private final ScheduledExecutor scheduledExecutor;
 
 	protected final Logger log;
 
@@ -74,14 +74,14 @@ public class HeartbeatManagerImpl<I, O> implements HeartbeatManager<I, O> {
 			ResourceID ownResourceID,
 			HeartbeatListener<I, O> heartbeatListener,
 			Executor executor,
-			ScheduledExecutorService scheduledExecutorService,
+			ScheduledExecutor scheduledExecutor,
 			Logger log) {
 		Preconditions.checkArgument(heartbeatTimeoutIntervalMs > 0L, "The heartbeat timeout has to be larger than 0.");
 
 		this.heartbeatTimeoutIntervalMs = heartbeatTimeoutIntervalMs;
 		this.ownResourceID = Preconditions.checkNotNull(ownResourceID);
 		this.heartbeatListener = Preconditions.checkNotNull(heartbeatListener, "heartbeatListener");
-		this.scheduledExecutorService = Preconditions.checkNotNull(scheduledExecutorService);
+		this.scheduledExecutor = Preconditions.checkNotNull(scheduledExecutor);
 		this.log = Preconditions.checkNotNull(log);
 		this.executor = Preconditions.checkNotNull(executor);
 		this.heartbeatTargets = new ConcurrentHashMap<>(16);
@@ -122,7 +122,7 @@ public class HeartbeatManagerImpl<I, O> implements HeartbeatManager<I, O> {
 				HeartbeatManagerImpl.HeartbeatMonitor<O> heartbeatMonitor = new HeartbeatManagerImpl.HeartbeatMonitor<>(
 					resourceID,
 					heartbeatTarget,
-					scheduledExecutorService,
+					scheduledExecutor,
 					heartbeatListener,
 					heartbeatTimeoutIntervalMs);
 
@@ -236,7 +236,7 @@ public class HeartbeatManagerImpl<I, O> implements HeartbeatManager<I, O> {
 		/** Associated heartbeat target */
 		private final HeartbeatTarget<O> heartbeatTarget;
 
-		private final ScheduledExecutorService scheduledExecutorService;
+		private final ScheduledExecutor scheduledExecutor;
 
 		/** Listener which is notified about heartbeat timeouts */
 		private final HeartbeatListener<?, ?> heartbeatListener;
@@ -251,13 +251,13 @@ public class HeartbeatManagerImpl<I, O> implements HeartbeatManager<I, O> {
 		HeartbeatMonitor(
 			ResourceID resourceID,
 			HeartbeatTarget<O> heartbeatTarget,
-			ScheduledExecutorService scheduledExecutorService,
+			ScheduledExecutor scheduledExecutor,
 			HeartbeatListener<?, O> heartbeatListener,
 			long heartbeatTimeoutIntervalMs) {
 
 			this.resourceID = Preconditions.checkNotNull(resourceID);
 			this.heartbeatTarget = Preconditions.checkNotNull(heartbeatTarget);
-			this.scheduledExecutorService = Preconditions.checkNotNull(scheduledExecutorService);
+			this.scheduledExecutor = Preconditions.checkNotNull(scheduledExecutor);
 			this.heartbeatListener = Preconditions.checkNotNull(heartbeatListener);
 
 			Preconditions.checkArgument(heartbeatTimeoutIntervalMs >= 0L, "The heartbeat timeout interval has to be larger than 0.");
@@ -278,7 +278,7 @@ public class HeartbeatManagerImpl<I, O> implements HeartbeatManager<I, O> {
 			if (state.get() == State.RUNNING) {
 				cancelTimeout();
 
-				futureTimeout = scheduledExecutorService.schedule(this, heartbeatTimeout, TimeUnit.MILLISECONDS);
+				futureTimeout = scheduledExecutor.schedule(this, heartbeatTimeout, TimeUnit.MILLISECONDS);
 
 				// Double check for concurrent accesses (e.g. a firing of the scheduled future)
 				if (state.get() != State.RUNNING) {

http://git-wip-us.apache.org/repos/asf/flink/blob/0b3d5c27/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerSenderImpl.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerSenderImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerSenderImpl.java
index 57c8671..32f8aa3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerSenderImpl.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerSenderImpl.java
@@ -21,10 +21,10 @@ package org.apache.flink.runtime.heartbeat;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.concurrent.AcceptFunction;
 import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.concurrent.ScheduledExecutor;
 import org.slf4j.Logger;
 
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.Executor;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 
@@ -33,7 +33,7 @@ import java.util.concurrent.TimeUnit;
  * its monitored {@link HeartbeatTarget}. The heartbeat period is configurable.
  *
  * @param <I> Type of the incoming heartbeat payload
- * @param <O> Type of the outgoind heartbeat payload
+ * @param <O> Type of the outgoing heartbeat payload
  */
 public class HeartbeatManagerSenderImpl<I, O> extends HeartbeatManagerImpl<I, O> implements Runnable {
 
@@ -44,18 +44,18 @@ public class HeartbeatManagerSenderImpl<I, O> extends HeartbeatManagerImpl<I, O>
 			long heartbeatTimeout,
 			ResourceID ownResourceID,
 			HeartbeatListener<I, O> heartbeatListener,
-			ExecutorService executorService,
-			ScheduledExecutorService scheduledExecutorService,
+			Executor executor,
+			ScheduledExecutor scheduledExecutor,
 			Logger log) {
 		super(
 			heartbeatTimeout,
 			ownResourceID,
 			heartbeatListener,
-			executorService,
-			scheduledExecutorService,
+			executor,
+			scheduledExecutor,
 			log);
 
-		triggerFuture = scheduledExecutorService.scheduleAtFixedRate(this, 0L, heartbeatPeriod, TimeUnit.MILLISECONDS);
+		triggerFuture = scheduledExecutor.scheduleAtFixedRate(this, 0L, heartbeatPeriod, TimeUnit.MILLISECONDS);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/0b3d5c27/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/TestingHeartbeatManagerImpl.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/TestingHeartbeatManagerImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/TestingHeartbeatManagerImpl.java
new file mode 100644
index 0000000..1238f1a
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/TestingHeartbeatManagerImpl.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.heartbeat;
+
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.concurrent.ScheduledExecutor;
+import org.slf4j.Logger;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executor;
+
+/**
+ * Heartbeat manager implementation which extends {@link HeartbeatManagerImpl} for testing.
+ * It overrides the {@link #unmonitorTarget(ResourceID)} to wait for some tests complete
+ * when notify heartbeat timeout.
+ *
+ * @param <I> Type of the incoming heartbeat payload
+ * @param <O> Type of the outgoing heartbeat payload
+ */
+public class TestingHeartbeatManagerImpl<I, O> extends HeartbeatManagerImpl<I, O> {
+
+	private final CountDownLatch waitLatch;
+
+	public TestingHeartbeatManagerImpl(
+			CountDownLatch waitLatch,
+			long heartbeatTimeoutIntervalMs,
+			ResourceID ownResourceID,
+			Executor executor,
+			ScheduledExecutor scheduledExecutor,
+			Logger log) {
+
+		super(heartbeatTimeoutIntervalMs, ownResourceID, executor, scheduledExecutor, log);
+
+		this.waitLatch = waitLatch;
+	}
+
+	@Override
+	public void unmonitorTarget(ResourceID resourceID) {
+		try {
+			waitLatch.await();
+		} catch (InterruptedException ex) {
+			log.error("Unexpected interrupted exception.", ex);
+		}
+
+		super.unmonitorTarget(resourceID);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0b3d5c27/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/TestingHeartbeatManagerSenderImpl.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/TestingHeartbeatManagerSenderImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/TestingHeartbeatManagerSenderImpl.java
new file mode 100644
index 0000000..7000895
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/TestingHeartbeatManagerSenderImpl.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.heartbeat;
+
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.concurrent.ScheduledExecutor;
+import org.slf4j.Logger;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executor;
+
+/**
+ *
+ * @param <I>
+ * @param <O>
+ */
+public class TestingHeartbeatManagerSenderImpl<I, O> extends HeartbeatManagerSenderImpl<I, O> {
+
+	private final CountDownLatch waitLatch;
+
+	public TestingHeartbeatManagerSenderImpl(
+			CountDownLatch waitLatch,
+			long heartbeatPeriod,
+			long heartbeatTimeout,
+			ResourceID ownResourceID,
+			Executor executor,
+			ScheduledExecutor scheduledExecutor,
+			Logger log) {
+
+		super(heartbeatPeriod, heartbeatTimeout, ownResourceID, executor, scheduledExecutor, log);
+
+		this.waitLatch = waitLatch;
+	}
+
+	@Override
+	public void unmonitorTarget(ResourceID resourceID) {
+		try {
+			waitLatch.await();
+		} catch (InterruptedException ex) {
+			log.error("Unexpected interrupted exception.", ex);
+		}
+
+		super.unmonitorTarget(resourceID);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0b3d5c27/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
index 6e02813..eced869 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
@@ -21,8 +21,11 @@ package org.apache.flink.runtime.jobmaster;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HeartbeatManagerOptions;
 import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
+import org.apache.flink.runtime.heartbeat.HeartbeatManagerSenderImpl;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.highavailability.RunningJobsRegistry;
 import org.apache.flink.runtime.highavailability.RunningJobsRegistry.JobSchedulingStatus;
@@ -167,6 +170,16 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions, F
 			this.runningJobsRegistry = haServices.getRunningJobsRegistry();
 			this.leaderElectionService = haServices.getJobManagerLeaderElectionService(jobGraph.getJobID());
 
+			// heartbeat manager last
+			final ResourceID resourceID = ResourceID.generate();
+			final HeartbeatManagerSenderImpl<Void, Void> jobManagerHeartbeatManager = new HeartbeatManagerSenderImpl<>(
+					configuration.getLong(HeartbeatManagerOptions.HEARTBEAT_INTERVAL),
+					configuration.getLong(HeartbeatManagerOptions.HEARTBEAT_TIMEOUT),
+					resourceID,
+					rpcService.getExecutor(),
+					rpcService.getScheduledExecutor(),
+					log);
+
 			// now start the JobManager
 			this.jobManager = new JobMaster(
 					jobGraph, configuration,
@@ -177,6 +190,8 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions, F
 					jobManagerServices.restartStrategyFactory,
 					jobManagerServices.rpcAskTimeout,
 					jobManagerMetrics,
+					resourceID,
+					jobManagerHeartbeatManager,
 					this,
 					this,
 					userCodeLoader);

http://git-wip-us.apache.org/repos/asf/flink/blob/0b3d5c27/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index 145216d..16c243c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -50,6 +50,9 @@ import org.apache.flink.runtime.executiongraph.IntermediateResult;
 import org.apache.flink.runtime.executiongraph.JobStatusListener;
 import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
 import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory;
+import org.apache.flink.runtime.heartbeat.HeartbeatListener;
+import org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl;
+import org.apache.flink.runtime.heartbeat.HeartbeatTarget;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.highavailability.LeaderIdMismatchException;
 import org.apache.flink.runtime.instance.Slot;
@@ -146,6 +149,9 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 	/** The metrics for the job */
 	private final MetricGroup jobMetricGroup;
 
+	/** The heartbeat manager with task managers */
+	private final HeartbeatManagerImpl<Void, Void> heartbeatManager;
+
 	/** The execution context which is used to execute futures */
 	private final ExecutorService executionContext;
 
@@ -164,6 +170,8 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 
 	private volatile UUID leaderSessionID;
 
+	private final ResourceID resourceID;
+
 	// --------- ResourceManager --------
 
 	/** Leader retriever service used to locate ResourceManager's address */
@@ -188,6 +196,8 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 			RestartStrategyFactory restartStrategyFactory,
 			Time rpcAskTimeout,
 			@Nullable JobManagerMetricGroup jobManagerMetricGroup,
+			ResourceID resourceID,
+			HeartbeatManagerImpl<Void, Void> heartbeatManager,
 			OnCompletionActions jobCompletionActions,
 			FatalErrorHandler errorHandler,
 			ClassLoader userCodeLoader) throws Exception
@@ -203,6 +213,8 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 		this.jobCompletionActions = checkNotNull(jobCompletionActions);
 		this.errorHandler = checkNotNull(errorHandler);
 		this.userCodeLoader = checkNotNull(userCodeLoader);
+		this.resourceID = checkNotNull(resourceID);
+		this.heartbeatManager = checkNotNull(heartbeatManager);
 
 		final String jobName = jobGraph.getName();
 		final JobID jid = jobGraph.getJobID();
@@ -276,6 +288,27 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 			// make sure we receive RPC and async calls
 			super.start();
 
+			heartbeatManager.start(new HeartbeatListener<Void, Void>() {
+				@Override
+				public void notifyHeartbeatTimeout(ResourceID resourceID) {
+					log.info("Notify heartbeat timeout with task manager {}", resourceID);
+					heartbeatManager.unmonitorTarget(resourceID);
+
+					getSelf().disconnectTaskManager(resourceID);
+				}
+
+				@Override
+				public void reportPayload(ResourceID resourceID, Void payload) {
+					// currently there is no payload from task manager and resource manager, so this method will not be called.
+				}
+
+				@Override
+				public Future<Void> retrievePayload() {
+					// currently no need payload.
+					return null;
+				}
+			});
+
 			log.info("JobManager started as leader {} for job {}", leaderSessionID, jobGraph.getJobID());
 			getSelf().startJobExecution();
 		}
@@ -290,6 +323,7 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 	@Override
 	public void shutDown() throws Exception {
 		// make sure there is a graceful exit
+		heartbeatManager.stop();
 		getSelf().suspendExecution(new Exception("JobManager is shutting down."));
 		super.shutDown();
 	}
@@ -512,7 +546,8 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 
 	@RpcMethod
 	public void disconnectTaskManager(final ResourceID resourceID) {
-		throw new UnsupportedOperationException();
+		registeredTaskManagers.remove(resourceID);
+		slotPoolGateway.releaseTaskManager(resourceID);
 	}
 
 	// TODO: This method needs a leader session ID
@@ -708,7 +743,7 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 
 		if (registeredTaskManagers.containsKey(taskManagerId)) {
 			final RegistrationResponse response = new JMTMRegistrationSuccess(
-					taskManagerId, libraryCacheManager.getBlobServerPort());
+					resourceID, libraryCacheManager.getBlobServerPort());
 			return FlinkCompletableFuture.completed(response);
 		} else {
 			return getRpcService().execute(new Callable<TaskExecutorGateway>() {
@@ -719,7 +754,7 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 				}
 			}).handleAsync(new BiFunction<TaskExecutorGateway, Throwable, RegistrationResponse>() {
 				@Override
-				public RegistrationResponse apply(TaskExecutorGateway taskExecutorGateway, Throwable throwable) {
+				public RegistrationResponse apply(final TaskExecutorGateway taskExecutorGateway, Throwable throwable) {
 					if (throwable != null) {
 						return new RegistrationResponse.Decline(throwable.getMessage());
 					}
@@ -734,7 +769,21 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 
 					slotPoolGateway.registerTaskManager(taskManagerId);
 					registeredTaskManagers.put(taskManagerId, Tuple2.of(taskManagerLocation, taskExecutorGateway));
-					return new JMTMRegistrationSuccess(taskManagerId, libraryCacheManager.getBlobServerPort());
+
+					// monitor the task manager as heartbeat target
+					heartbeatManager.monitorTarget(taskManagerId, new HeartbeatTarget<Void>() {
+						@Override
+						public void sendHeartbeat(ResourceID resourceID, Void payload) {
+							// the task manager will not request heartbeat, so this method will never be called currently
+						}
+
+						@Override
+						public void requestHeartbeat(ResourceID resourceID, Void payload) {
+							taskExecutorGateway.heartbeatFromJobManager(resourceID);
+						}
+					});
+
+					return new JMTMRegistrationSuccess(resourceID, libraryCacheManager.getBlobServerPort());
 				}
 			}, getMainThreadExecutor());
 		}
@@ -748,6 +797,11 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 		// TODO: Implement disconnect behaviour
 	}
 
+	@RpcMethod
+	public void heartbeatFromTaskManager(final ResourceID resourceID) {
+		heartbeatManager.sendHeartbeat(resourceID, null);
+	}
+
 	//----------------------------------------------------------------------------------------------
 	// Internal methods
 	//----------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/0b3d5c27/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
index de7646b..e7e3111 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
@@ -218,4 +218,11 @@ public interface JobMasterGateway extends CheckpointCoordinatorGateway {
 			final TaskManagerLocation taskManagerLocation,
 			final UUID leaderId,
 			@RpcTimeout final Time timeout);
+
+	/**
+	 * Send the heartbeat to job manager from task manager
+	 *
+	 * @param resourceID unique id of the task manager
+	 */
+	void heartbeatFromTaskManager(final ResourceID resourceID);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0b3d5c27/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobManagerConnection.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobManagerConnection.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobManagerConnection.java
index 2b224bc..98c7bf1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobManagerConnection.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobManagerConnection.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.runtime.taskexecutor;
 
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
 import org.apache.flink.runtime.io.network.netty.PartitionProducerStateChecker;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
@@ -33,6 +35,12 @@ import java.util.UUID;
  */
 public class JobManagerConnection {
 
+	// Job id related with the job manager
+	private final JobID jobID;
+
+	// The unique id used for identifying the job manager
+	private final ResourceID resourceID;
+
 	// Job master leader session id
 	private final UUID leaderId;
 
@@ -55,6 +63,8 @@ public class JobManagerConnection {
 	private final PartitionProducerStateChecker partitionStateChecker;
 
 	public JobManagerConnection(
+		JobID jobID,
+		ResourceID resourceID,
 		JobMasterGateway jobMasterGateway,
 		UUID leaderId,
 		TaskManagerActions taskManagerActions,
@@ -62,6 +72,8 @@ public class JobManagerConnection {
 		LibraryCacheManager libraryCacheManager,
 		ResultPartitionConsumableNotifier resultPartitionConsumableNotifier,
 		PartitionProducerStateChecker partitionStateChecker) {
+		this.jobID = Preconditions.checkNotNull(jobID);
+		this.resourceID = Preconditions.checkNotNull(resourceID);
 		this.leaderId = Preconditions.checkNotNull(leaderId);
 		this.jobMasterGateway = Preconditions.checkNotNull(jobMasterGateway);
 		this.taskManagerActions = Preconditions.checkNotNull(taskManagerActions);
@@ -71,6 +83,14 @@ public class JobManagerConnection {
 		this.partitionStateChecker = Preconditions.checkNotNull(partitionStateChecker);
 	}
 
+	public JobID getJobID() {
+		return jobID;
+	}
+
+	public ResourceID getResourceID() {
+		return resourceID;
+	}
+
 	public UUID getLeaderId() {
 		return leaderId;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/0b3d5c27/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index df5765a..e601b0b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -38,6 +38,9 @@ import org.apache.flink.runtime.executiongraph.JobInformation;
 import org.apache.flink.runtime.executiongraph.PartitionInfo;
 import org.apache.flink.runtime.executiongraph.TaskInformation;
 import org.apache.flink.runtime.filecache.FileCache;
+import org.apache.flink.runtime.heartbeat.HeartbeatListener;
+import org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl;
+import org.apache.flink.runtime.heartbeat.HeartbeatTarget;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.network.NetworkEnvironment;
@@ -127,6 +130,9 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
 	/** The metric registry in the task manager */
 	private final MetricRegistry metricRegistry;
 
+	/** The heartbeat manager for job manager in the task manager */
+	private final HeartbeatManagerImpl<Void, Void> jobManagerHeartbeatManager;
+
 	/** The fatal error handler to use in case of a fatal error */
 	private final FatalErrorHandler fatalErrorHandler;
 
@@ -163,6 +169,7 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
 		NetworkEnvironment networkEnvironment,
 		HighAvailabilityServices haServices,
 		MetricRegistry metricRegistry,
+		HeartbeatManagerImpl<Void, Void> jobManagerHeartbeatManager,
 		TaskManagerMetricGroup taskManagerMetricGroup,
 		BroadcastVariableManager broadcastVariableManager,
 		FileCache fileCache,
@@ -182,6 +189,7 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
 		this.networkEnvironment = checkNotNull(networkEnvironment);
 		this.haServices = checkNotNull(haServices);
 		this.metricRegistry = checkNotNull(metricRegistry);
+		this.jobManagerHeartbeatManager = checkNotNull(jobManagerHeartbeatManager);
 		this.taskSlotTable = checkNotNull(taskSlotTable);
 		this.fatalErrorHandler = checkNotNull(fatalErrorHandler);
 		this.taskManagerMetricGroup = checkNotNull(taskManagerMetricGroup);
@@ -213,6 +221,38 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
 
 		// start the job leader service
 		jobLeaderService.start(getAddress(), getRpcService(), haServices, new JobLeaderListenerImpl());
+
+		// start the heartbeat manager for monitoring job manager
+		jobManagerHeartbeatManager.start(new HeartbeatListener<Void, Void>() {
+			@Override
+			public void notifyHeartbeatTimeout(final ResourceID resourceID) {
+				runAsync(new Runnable() {
+					@Override
+					public void run() {
+						log.info("Notify heartbeat timeout with job manager {}", resourceID);
+						jobManagerHeartbeatManager.unmonitorTarget(resourceID);
+
+						if (jobManagerConnections.containsKey(resourceID)) {
+							JobManagerConnection jobManagerConnection = jobManagerConnections.get(resourceID);
+							if (jobManagerConnection != null) {
+								closeJobManagerConnection(jobManagerConnection.getJobID());
+							}
+						}
+					}
+				});
+			}
+
+			@Override
+			public void reportPayload(ResourceID resourceID, Void payload) {
+				// currently there is no payload from job manager, so this method will not be called.
+			}
+
+			@Override
+			public Future<Void> retrievePayload() {
+				// currently no need payload.
+				return null;
+			}
+		});
 	}
 
 	/**
@@ -230,6 +270,8 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
 			resourceManagerConnection.close();
 		}
 
+		jobManagerHeartbeatManager.stop();
+
 		ioManager.shutdown();
 
 		memoryManager.shutdown();
@@ -472,6 +514,15 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
 	}
 
 	// ----------------------------------------------------------------------
+	// Heartbeat RPC
+	// ----------------------------------------------------------------------
+
+	@RpcMethod
+	public void heartbeatFromJobManager(ResourceID resourceID) {
+		jobManagerHeartbeatManager.requestHeartbeat(resourceID, null);
+	}
+
+	// ----------------------------------------------------------------------
 	// Checkpointing RPCs
 	// ----------------------------------------------------------------------
 
@@ -729,20 +780,39 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
 		}
 	}
 
-	private void establishJobManagerConnection(JobID jobId, JobMasterGateway jobMasterGateway, UUID jobManagerLeaderId, JMTMRegistrationSuccess registrationSuccess) {
+	private void establishJobManagerConnection(JobID jobId, final JobMasterGateway jobMasterGateway, UUID jobManagerLeaderId, JMTMRegistrationSuccess registrationSuccess) {
 		log.info("Establish JobManager connection for job {}.", jobId);
 
 		if (jobManagerTable.contains(jobId)) {
 			JobManagerConnection oldJobManagerConnection = jobManagerTable.get(jobId);
-
 			if (!oldJobManagerConnection.getLeaderId().equals(jobManagerLeaderId)) {
 				closeJobManagerConnection(jobId);
-				jobManagerTable.put(jobId, associateWithJobManager(jobMasterGateway, jobManagerLeaderId, registrationSuccess.getBlobPort()));
 			}
-		} else {
-			jobManagerTable.put(jobId, associateWithJobManager(jobMasterGateway, jobManagerLeaderId, registrationSuccess.getBlobPort()));
 		}
 
+		ResourceID jobManagerResourceID = registrationSuccess.getResourceID();
+		JobManagerConnection newJobManagerConnection = associateWithJobManager(
+				jobId,
+				jobManagerResourceID,
+				jobMasterGateway,
+				jobManagerLeaderId,
+				registrationSuccess.getBlobPort());
+		jobManagerConnections.put(jobManagerResourceID, newJobManagerConnection);
+		jobManagerTable.put(jobId, newJobManagerConnection);
+
+		// monitor the job manager as heartbeat target
+		jobManagerHeartbeatManager.monitorTarget(jobManagerResourceID, new HeartbeatTarget<Void>() {
+			@Override
+			public void sendHeartbeat(ResourceID resourceID, Void payload) {
+				jobMasterGateway.heartbeatFromTaskManager(resourceID);
+			}
+
+			@Override
+			public void requestHeartbeat(ResourceID resourceID, Void payload) {
+				// request heartbeat will never be called in task manager side
+			}
+		});
+
 		offerSlotsToJobManager(jobId);
 	}
 
@@ -777,6 +847,7 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
 
 		if (jobManagerConnection != null) {
 			try {
+				jobManagerConnections.remove(jobManagerConnection.getResourceID());
 				disassociateFromJobManager(jobManagerConnection);
 			} catch (IOException e) {
 				log.warn("Could not properly disassociate from JobManager {}.",
@@ -785,7 +856,14 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
 		}
 	}
 
-	private JobManagerConnection associateWithJobManager(JobMasterGateway jobMasterGateway, UUID jobManagerLeaderId, int blobPort) {
+	private JobManagerConnection associateWithJobManager(
+			JobID jobID,
+			ResourceID resourceID,
+			JobMasterGateway jobMasterGateway,
+			UUID jobManagerLeaderId,
+			int blobPort) {
+		Preconditions.checkNotNull(jobID);
+		Preconditions.checkNotNull(resourceID);
 		Preconditions.checkNotNull(jobManagerLeaderId);
 		Preconditions.checkNotNull(jobMasterGateway);
 		Preconditions.checkArgument(blobPort > 0 || blobPort < MAX_BLOB_PORT, "Blob server port is out of range.");
@@ -820,6 +898,8 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
 		PartitionProducerStateChecker partitionStateChecker = new RpcPartitionStateChecker(jobManagerLeaderId, jobMasterGateway);
 
 		return new JobManagerConnection(
+			jobID,
+			resourceID,
 			jobMasterGateway,
 			jobManagerLeaderId,
 			taskManagerActions,

http://git-wip-us.apache.org/repos/asf/flink/blob/0b3d5c27/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
index 36a3255..95db932 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.SlotID;
 import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.resourcemanager.messages.taskexecutor.TMSlotRequestReply;
@@ -131,4 +132,11 @@ public interface TaskExecutorGateway extends RpcGateway {
 	 * @return Future acknowledge if the task is successfully canceled
 	 */
 	Future<Acknowledge> cancelTask(ExecutionAttemptID executionAttemptID, @RpcTimeout Time timeout);
+
+	/**
+	 * Request heartbeat from the job manager
+	 *
+	 * @param resourceID unique id of the job manager
+	 */
+	void heartbeatFromJobManager(ResourceID resourceID);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0b3d5c27/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
index 3500f6d..402421c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
@@ -21,9 +21,11 @@ package org.apache.flink.runtime.taskexecutor;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HeartbeatManagerOptions;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
@@ -112,6 +114,13 @@ public class TaskManagerRunner implements FatalErrorHandler {
 		// Initialize the TM metrics
 		TaskExecutorMetricsInitializer.instantiateStatusMetrics(taskManagerMetricGroup, taskManagerServices.getNetworkEnvironment());
 
+		HeartbeatManagerImpl<Void, Void> heartbeatManager = new HeartbeatManagerImpl<>(
+				configuration.getLong(HeartbeatManagerOptions.HEARTBEAT_TIMEOUT),
+				resourceID,
+				executor,
+				rpcService.getScheduledExecutor(),
+				LOG);
+
 		this.taskManager = new TaskExecutor(
 			taskManagerConfiguration,
 			taskManagerServices.getTaskManagerLocation(),
@@ -121,6 +130,7 @@ public class TaskManagerRunner implements FatalErrorHandler {
 			taskManagerServices.getNetworkEnvironment(),
 			highAvailabilityServices,
 			metricRegistry,
+			heartbeatManager,
 			taskManagerMetricGroup,
 			taskManagerServices.getBroadcastVariableManager(),
 			taskManagerServices.getFileCache(),

http://git-wip-us.apache.org/repos/asf/flink/blob/0b3d5c27/flink-runtime/src/test/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerTest.java
index 0a8923d..3da18aa 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerTest.java
@@ -21,6 +21,8 @@ package org.apache.flink.runtime.heartbeat;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.concurrent.CompletableFuture;
 import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.concurrent.ScheduledExecutor;
+import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter;
 import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
 import org.apache.flink.runtime.util.DirectExecutorService;
 import org.apache.flink.util.TestLogger;
@@ -63,7 +65,7 @@ public class HeartbeatManagerTest extends TestLogger {
 		ResourceID ownResourceID = new ResourceID("foobar");
 		ResourceID targetResourceID = new ResourceID("barfoo");
 		HeartbeatListener<Object, Object> heartbeatListener = mock(HeartbeatListener.class);
-		ScheduledExecutorService scheduledExecutorService = mock(ScheduledExecutorService.class);
+		ScheduledExecutor scheduledExecutor = mock(ScheduledExecutor.class);
 
 		Object expectedObject = new Object();
 
@@ -74,7 +76,7 @@ public class HeartbeatManagerTest extends TestLogger {
 			ownResourceID,
 			heartbeatListener,
 			new DirectExecutorService(),
-			scheduledExecutorService,
+			scheduledExecutor,
 			LOG);
 
 		HeartbeatTarget<Object> heartbeatTarget = mock(HeartbeatTarget.class);
@@ -101,10 +103,10 @@ public class HeartbeatManagerTest extends TestLogger {
 		ResourceID ownResourceID = new ResourceID("foobar");
 		ResourceID targetResourceID = new ResourceID("barfoo");
 		HeartbeatListener<Object, Object> heartbeatListener = mock(HeartbeatListener.class);
-		ScheduledExecutorService scheduledExecutorService = mock(ScheduledExecutorService.class);
+		ScheduledExecutor scheduledExecutor = mock(ScheduledExecutor.class);
 		ScheduledFuture<?> scheduledFuture = mock(ScheduledFuture.class);
 
-		doReturn(scheduledFuture).when(scheduledExecutorService).schedule(any(Runnable.class), anyLong(), any(TimeUnit.class));
+		doReturn(scheduledFuture).when(scheduledExecutor).schedule(any(Runnable.class), anyLong(), any(TimeUnit.class));
 
 		Object expectedObject = new Object();
 
@@ -115,7 +117,7 @@ public class HeartbeatManagerTest extends TestLogger {
 			ownResourceID,
 			heartbeatListener,
 			new DirectExecutorService(),
-			scheduledExecutorService,
+			scheduledExecutor,
 			LOG);
 
 		HeartbeatTarget<Object> heartbeatTarget = mock(HeartbeatTarget.class);
@@ -125,7 +127,7 @@ public class HeartbeatManagerTest extends TestLogger {
 		heartbeatManager.receiveHeartbeat(targetResourceID, expectedObject);
 
 		verify(scheduledFuture, times(1)).cancel(true);
-		verify(scheduledExecutorService, times(2)).schedule(any(Runnable.class), eq(heartbeatTimeout), eq(TimeUnit.MILLISECONDS));
+		verify(scheduledExecutor, times(2)).schedule(any(Runnable.class), eq(heartbeatTimeout), eq(TimeUnit.MILLISECONDS));
 	}
 
 	/**
@@ -155,7 +157,7 @@ public class HeartbeatManagerTest extends TestLogger {
 			ownResourceID,
 			heartbeatListener,
 			new DirectExecutorService(),
-			new ScheduledThreadPoolExecutor(1),
+			new ScheduledExecutorServiceAdapter(new ScheduledThreadPoolExecutor(1)),
 			LOG);
 
 		HeartbeatTarget<Object> heartbeatTarget = mock(HeartbeatTarget.class);
@@ -205,7 +207,7 @@ public class HeartbeatManagerTest extends TestLogger {
 			resourceID,
 			heartbeatListener,
 			new DirectExecutorService(),
-			new ScheduledThreadPoolExecutor(1),
+			new ScheduledExecutorServiceAdapter(new ScheduledThreadPoolExecutor(1)),
 			LOG);
 
 		HeartbeatManagerSenderImpl<Object, Object> heartbeatManager2 = new HeartbeatManagerSenderImpl<>(
@@ -214,7 +216,7 @@ public class HeartbeatManagerTest extends TestLogger {
 			resourceID2,
 			heartbeatListener2,
 			new DirectExecutorService(),
-			new ScheduledThreadPoolExecutor(1),
+			new ScheduledExecutorServiceAdapter(new ScheduledThreadPoolExecutor(1)),
 			LOG);;
 
 		heartbeatManager.monitorTarget(resourceID2, heartbeatManager2);
@@ -254,7 +256,7 @@ public class HeartbeatManagerTest extends TestLogger {
 			resourceID,
 			heartbeatListener,
 			new DirectExecutorService(),
-			new ScheduledThreadPoolExecutor(1),
+			new ScheduledExecutorServiceAdapter(new ScheduledThreadPoolExecutor(1)),
 			LOG);
 
 		heartbeatManager.monitorTarget(targetID, mock(HeartbeatTarget.class));

http://git-wip-us.apache.org/repos/asf/flink/blob/0b3d5c27/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
new file mode 100644
index 0000000..cdad87f
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
+import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoader;
+import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory;
+import org.apache.flink.runtime.heartbeat.HeartbeatManagerSenderImpl;
+import org.apache.flink.runtime.heartbeat.TestingHeartbeatManagerSenderImpl;
+import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
+import org.apache.flink.runtime.instance.SlotPoolGateway;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobmanager.OnCompletionActions;
+import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
+import org.apache.flink.runtime.rpc.TestingSerialRpcService;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.runtime.util.TestingFatalErrorHandler;
+import org.apache.flink.util.TestLogger;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+import org.powermock.reflect.Whitebox;
+
+import java.net.InetAddress;
+import java.net.URL;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.ConcurrentHashMap;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.*;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(BlobLibraryCacheManager.class)
+public class JobMasterTest extends TestLogger {
+
+	@Test
+	public void testHeartbeatTimeoutWithTaskManager() throws Exception {
+		final TestingHighAvailabilityServices haServices = new TestingHighAvailabilityServices();
+		final TestingLeaderRetrievalService rmLeaderRetrievalService = new TestingLeaderRetrievalService();
+		haServices.setResourceManagerLeaderRetriever(rmLeaderRetrievalService);
+		haServices.setCheckpointRecoveryFactory(mock(CheckpointRecoveryFactory.class));
+		final TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler();
+
+		final String jobManagerAddress = "jm";
+		final UUID jmLeaderId = UUID.randomUUID();
+		final ResourceID jmResourceId = new ResourceID(jobManagerAddress);
+
+		final String taskManagerAddress = "tm";
+		final ResourceID tmResourceId = new ResourceID(taskManagerAddress);
+		final TaskManagerLocation taskManagerLocation = new TaskManagerLocation(tmResourceId, InetAddress.getLoopbackAddress(), 1234);
+		final TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class);
+
+		final TestingSerialRpcService rpc = new TestingSerialRpcService();
+		rpc.registerGateway(taskManagerAddress, taskExecutorGateway);
+
+		final long heartbeatInterval = 1L;
+		final long heartbeatTimeout = 5L;
+		final CountDownLatch waitLatch = new CountDownLatch(1);
+		final HeartbeatManagerSenderImpl<Void, Void> jmHeartbeatManager = new TestingHeartbeatManagerSenderImpl<>(
+				waitLatch,
+				heartbeatInterval,
+				heartbeatTimeout,
+				jmResourceId,
+				rpc.getExecutor(),
+				rpc.getScheduledExecutor(),
+				log);
+
+		try {
+			final JobMaster jobMaster = new JobMaster(
+					new JobGraph(),
+					new Configuration(),
+					rpc,
+					haServices,
+					Executors.newScheduledThreadPool(1),
+					mock(BlobLibraryCacheManager.class),
+					mock(RestartStrategyFactory.class),
+					Time.of(10, TimeUnit.SECONDS),
+					null,
+					jmResourceId,
+					jmHeartbeatManager,
+					mock(OnCompletionActions.class),
+					testingFatalErrorHandler,
+					new FlinkUserCodeClassLoader(new URL[0]));
+
+			// also start the heartbeat manager in job manager
+			jobMaster.start(jmLeaderId);
+
+			// register task manager will trigger monitoring heartbeat target, schedule heartbeat request in interval time
+			jobMaster.registerTaskManager(taskManagerAddress, taskManagerLocation, jmLeaderId);
+
+			verify(taskExecutorGateway, atLeast(1)).heartbeatFromJobManager(eq(jmResourceId));
+
+			final ConcurrentHashMap<ResourceID, Object> heartbeatTargets = Whitebox.getInternalState(jmHeartbeatManager, "heartbeatTargets");
+			final Map<ResourceID, Tuple2<TaskManagerLocation, TaskExecutorGateway>> registeredTMsInJM = Whitebox.getInternalState(jobMaster, "registeredTaskManagers");
+			final SlotPoolGateway slotPoolGateway = mock(SlotPoolGateway.class);
+			Whitebox.setInternalState(jobMaster, "slotPoolGateway", slotPoolGateway);
+
+			// before heartbeat timeout
+			assertTrue(heartbeatTargets.containsKey(tmResourceId));
+			assertTrue(registeredTMsInJM.containsKey(tmResourceId));
+
+			// continue to unmonitor heartbeat target
+			waitLatch.countDown();
+
+			// after heartbeat timeout
+			verify(slotPoolGateway, timeout(heartbeatTimeout * 5)).releaseTaskManager(eq(tmResourceId));
+			assertFalse(heartbeatTargets.containsKey(tmResourceId));
+			assertFalse(registeredTMsInJM.containsKey(tmResourceId));
+
+			// check if a concurrent error occurred
+			testingFatalErrorHandler.rethrowError();
+
+		} finally {
+			rpc.stopService();
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0b3d5c27/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
index 076d126..5ffc97e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
@@ -28,6 +28,7 @@ import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
 import org.apache.flink.runtime.filecache.FileCache;
+import org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl;
 import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.network.NetworkEnvironment;
@@ -103,6 +104,7 @@ public class TaskExecutorITCase {
 			rpcService.getScheduledExecutor(),
 			resourceManagerConfiguration.getJobTimeout());
 		MetricRegistry metricRegistry = mock(MetricRegistry.class);
+		HeartbeatManagerImpl heartbeatManager = mock(HeartbeatManagerImpl.class);
 
 		final TaskManagerConfiguration taskManagerConfiguration = TaskManagerConfiguration.fromConfiguration(configuration);
 		final TaskManagerLocation taskManagerLocation = new TaskManagerLocation(taskManagerResourceId, InetAddress.getLocalHost(), 1234);
@@ -134,6 +136,7 @@ public class TaskExecutorITCase {
 			networkEnvironment,
 			testingHAServices,
 			metricRegistry,
+			heartbeatManager,
 			taskManagerMetricGroup,
 			broadcastVariableManager,
 			fileCache,

http://git-wip-us.apache.org/repos/asf/flink/blob/0b3d5c27/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
index d413a01..f500246 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
@@ -39,6 +39,8 @@ import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.executiongraph.JobInformation;
 import org.apache.flink.runtime.executiongraph.TaskInformation;
 import org.apache.flink.runtime.filecache.FileCache;
+import org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl;
+import org.apache.flink.runtime.heartbeat.TestingHeartbeatManagerImpl;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.highavailability.NonHaServices;
 import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
@@ -80,31 +82,131 @@ import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TestName;
 import org.mockito.Matchers;
+import org.powermock.reflect.Whitebox;
 
 import java.net.InetAddress;
 import java.net.URL;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.Map;
 import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
 
 import static org.hamcrest.Matchers.contains;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.any;
-import static org.mockito.Mockito.anyInt;
-import static org.mockito.Mockito.anyString;
-import static org.mockito.Mockito.eq;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.*;
 
 public class TaskExecutorTest extends TestLogger {
 
 	@Rule
 	public TestName name = new TestName();
 
+	@Test
+	public void testHeartbeatTimeoutWithJobManager() throws Exception {
+		final JobID jobId = new JobID();
+		final Configuration configuration = new Configuration();
+		final TaskManagerConfiguration tmConfig = TaskManagerConfiguration.fromConfiguration(configuration);
+		final ResourceID tmResourceId = new ResourceID("tm");
+		final TaskManagerLocation taskManagerLocation = new TaskManagerLocation(tmResourceId, InetAddress.getLoopbackAddress(), 1234);
+		final TaskSlotTable taskSlotTable = new TaskSlotTable(Arrays.asList(mock(ResourceProfile.class)), mock(TimerService.class));
+
+		final TestingSerialRpcService rpc = new TestingSerialRpcService();
+		final JobLeaderService jobLeaderService = new JobLeaderService(taskManagerLocation);
+		final TestingHighAvailabilityServices haServices = new TestingHighAvailabilityServices();
+		final TestingLeaderRetrievalService rmLeaderRetrievalService = new TestingLeaderRetrievalService();
+		final TestingLeaderRetrievalService jmLeaderRetrievalService = new TestingLeaderRetrievalService();
+		haServices.setJobMasterLeaderRetriever(jobId, jmLeaderRetrievalService);
+		haServices.setResourceManagerLeaderRetriever(rmLeaderRetrievalService);
+
+		final TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler();
+
+		final CountDownLatch waitLatch =  new CountDownLatch(1);
+		final long heartbeatTimeout = 10L;
+		final HeartbeatManagerImpl<Void, Void> tmHeartbeatManager = new TestingHeartbeatManagerImpl<>(
+				waitLatch,
+				heartbeatTimeout,
+				tmResourceId,
+				rpc.getExecutor(),
+				rpc.getScheduledExecutor(),
+				log);
+
+		final String jobMasterAddress = "jm";
+		final UUID jmLeaderId = UUID.randomUUID();
+		final ResourceID jmResourceId = new ResourceID(jobMasterAddress);
+		final JobMasterGateway jobMasterGateway = mock(JobMasterGateway.class);
+		final int blobPort = 42;
+
+		when(jobMasterGateway.registerTaskManager(
+				any(String.class),
+				eq(taskManagerLocation),
+				eq(jmLeaderId),
+				any(Time.class)
+		)).thenReturn(FlinkCompletableFuture.<RegistrationResponse>completed(new JMTMRegistrationSuccess(jmResourceId, blobPort)));
+		when(jobMasterGateway.getAddress()).thenReturn(jobMasterAddress);
+
+		try {
+			final TaskExecutor taskManager = new TaskExecutor(
+					tmConfig,
+					taskManagerLocation,
+					rpc,
+					mock(MemoryManager.class),
+					mock(IOManager.class),
+					mock(NetworkEnvironment.class),
+					haServices,
+					mock(MetricRegistry.class),
+					tmHeartbeatManager,
+					mock(TaskManagerMetricGroup.class),
+					mock(BroadcastVariableManager.class),
+					mock(FileCache.class),
+					taskSlotTable,
+					new JobManagerTable(),
+					jobLeaderService,
+					testingFatalErrorHandler);
+
+			taskManager.start();
+
+			rpc.registerGateway(jobMasterAddress, jobMasterGateway);
+
+			// we have to add the job after the TaskExecutor, because otherwise the service has not
+			// been properly started.
+			jobLeaderService.addJob(jobId, jobMasterAddress);
+
+			// now inform the task manager about the new job leader
+			jmLeaderRetrievalService.notifyListener(jobMasterAddress, jmLeaderId);
+
+			// register task manager success will trigger monitoring heartbeat target between tm and jm
+			verify(jobMasterGateway).registerTaskManager(
+					eq(taskManager.getAddress()), eq(taskManagerLocation), eq(jmLeaderId), any(Time.class));
+
+			final ConcurrentHashMap<ResourceID, Object> heartbeatTargets = Whitebox.getInternalState(tmHeartbeatManager, "heartbeatTargets");
+			final JobManagerTable jobManagerTable = Whitebox.getInternalState(taskManager, "jobManagerTable");
+			final Map<ResourceID, JobManagerConnection> jobManagerConnections = Whitebox.getInternalState(taskManager, "jobManagerConnections");
+
+			// before heartbeat timeout
+			assertTrue(heartbeatTargets.containsKey(jmResourceId));
+			assertTrue(jobManagerTable.contains(jobId));
+			assertTrue(jobManagerConnections.containsKey(jmResourceId));
+
+			// continue to unmonitor heartbeat target
+			waitLatch.countDown();
+
+			// after heartbeat timeout
+			verify(jobMasterGateway, timeout(heartbeatTimeout)).disconnectTaskManager(eq(tmResourceId));
+			assertFalse(heartbeatTargets.containsKey(jmResourceId));
+			assertFalse(jobManagerTable.contains(jobId));
+			assertFalse(jobManagerConnections.containsKey(jmResourceId));
+
+			// check if a concurrent error occurred
+			testingFatalErrorHandler.rethrowError();
+
+		} finally {
+			rpc.stopService();
+		}
+	}
 
 	@Test
 	public void testImmediatelyRegistersIfLeaderIsKnown() throws Exception {
@@ -146,6 +248,7 @@ public class TaskExecutorTest extends TestLogger {
 				mock(NetworkEnvironment.class),
 				haServices,
 				mock(MetricRegistry.class),
+				mock(HeartbeatManagerImpl.class),
 				mock(TaskManagerMetricGroup.class),
 				mock(BroadcastVariableManager.class),
 				mock(FileCache.class),
@@ -222,6 +325,7 @@ public class TaskExecutorTest extends TestLogger {
 				mock(NetworkEnvironment.class),
 				haServices,
 				mock(MetricRegistry.class),
+				mock(HeartbeatManagerImpl.class),
 				mock(TaskManagerMetricGroup.class),
 				mock(BroadcastVariableManager.class),
 				mock(FileCache.class),
@@ -310,6 +414,8 @@ public class TaskExecutorTest extends TestLogger {
 		when(libraryCacheManager.getClassLoader(eq(jobId))).thenReturn(getClass().getClassLoader());
 
 		final JobManagerConnection jobManagerConnection = new JobManagerConnection(
+			jobId,
+			ResourceID.generate(),
 			mock(JobMasterGateway.class),
 			jobManagerLeaderId,
 			mock(TaskManagerActions.class),
@@ -351,6 +457,7 @@ public class TaskExecutorTest extends TestLogger {
 				networkEnvironment,
 				haServices,
 				mock(MetricRegistry.class),
+				mock(HeartbeatManagerImpl.class),
 				taskManagerMetricGroup,
 				mock(BroadcastVariableManager.class),
 				mock(FileCache.class),
@@ -457,6 +564,7 @@ public class TaskExecutorTest extends TestLogger {
 				mock(NetworkEnvironment.class),
 				haServices,
 				mock(MetricRegistry.class),
+				mock(HeartbeatManagerImpl.class),
 				mock(TaskManagerMetricGroup.class),
 				mock(BroadcastVariableManager.class),
 				mock(FileCache.class),
@@ -570,6 +678,7 @@ public class TaskExecutorTest extends TestLogger {
 				mock(NetworkEnvironment.class),
 				haServices,
 				mock(MetricRegistry.class),
+				mock(HeartbeatManagerImpl.class),
 				mock(TaskManagerMetricGroup.class),
 				mock(BroadcastVariableManager.class),
 				mock(FileCache.class),
@@ -644,6 +753,7 @@ public class TaskExecutorTest extends TestLogger {
 				mock(NetworkEnvironment.class),
 				haServices,
 				mock(MetricRegistry.class),
+				mock(HeartbeatManagerImpl.class),
 				mock(TaskManagerMetricGroup.class),
 				mock(BroadcastVariableManager.class),
 				mock(FileCache.class),
@@ -764,6 +874,8 @@ public class TaskExecutorTest extends TestLogger {
 		when(libraryCacheManager.getClassLoader(eq(jobId))).thenReturn(getClass().getClassLoader());
 
 		final JobManagerConnection jobManagerConnection = new JobManagerConnection(
+			jobId,
+			jmResourceId,
 			jobMasterGateway,
 			jobManagerLeaderId,
 			mock(TaskManagerActions.class),
@@ -784,6 +896,7 @@ public class TaskExecutorTest extends TestLogger {
 				mock(NetworkEnvironment.class),
 				haServices,
 				mock(MetricRegistry.class),
+				mock(HeartbeatManagerImpl.class),
 				mock(TaskManagerMetricGroup.class),
 				mock(BroadcastVariableManager.class),
 				mock(FileCache.class),


[2/2] flink git commit: [FLINK-4364] Introduce HeartbeatServices for the HeartbeatManager instantiation

Posted by tr...@apache.org.
[FLINK-4364] Introduce HeartbeatServices for the HeartbeatManager instantiation

The HeartbeatServices are used to create all services relevant for heartbeating. This
includes at the moment the creation of HeartbeatManager implementations which actively
send heartbeats and those which only respond to heartbeat requests.

Add comments


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/97ccc147
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/97ccc147
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/97ccc147

Branch: refs/heads/master
Commit: 97ccc147382d866bc3e82caf9b87f7cd2e5989f9
Parents: 0b3d5c2
Author: Till Rohrmann <tr...@apache.org>
Authored: Tue Mar 7 22:45:42 2017 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Fri Mar 17 17:00:05 2017 +0100

----------------------------------------------------------------------
 .../configuration/HeartbeatManagerOptions.java  |   2 +-
 .../runtime/heartbeat/HeartbeatServices.java    | 116 ++++++++++++++++++
 .../heartbeat/TestingHeartbeatManagerImpl.java  |   3 +-
 .../TestingHeartbeatManagerSenderImpl.java      |   3 +-
 .../runtime/jobmaster/JobManagerRunner.java     |  83 +++++++------
 .../flink/runtime/jobmaster/JobMaster.java      | 106 +++++++++-------
 .../runtime/jobmaster/JobMasterGateway.java     |   3 +-
 .../flink/runtime/minicluster/MiniCluster.java  |  14 ++-
 .../minicluster/MiniClusterJobDispatcher.java   |  24 +++-
 .../runtime/taskexecutor/TaskExecutor.java      | 105 +++++++++-------
 .../taskexecutor/TaskExecutorGateway.java       |  14 ++-
 .../runtime/taskexecutor/TaskManagerRunner.java |  35 +++---
 .../jobmaster/JobManagerRunnerMockTest.java     |   6 +
 .../flink/runtime/jobmaster/JobMasterTest.java  | 120 ++++++++++++-------
 .../taskexecutor/TaskExecutorITCase.java        |   7 +-
 .../runtime/taskexecutor/TaskExecutorTest.java  | 110 ++++++++---------
 .../yarn/YarnFlinkApplicationMasterRunner.java  |  20 +++-
 .../flink/yarn/YarnTaskExecutorRunner.java      |  11 +-
 18 files changed, 515 insertions(+), 267 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/97ccc147/flink-core/src/main/java/org/apache/flink/configuration/HeartbeatManagerOptions.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/HeartbeatManagerOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/HeartbeatManagerOptions.java
index 2258eb1..81cbc5d 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/HeartbeatManagerOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/HeartbeatManagerOptions.java
@@ -30,7 +30,7 @@ public class HeartbeatManagerOptions {
 
 	/** Time interval for requesting heartbeat from sender side */
 	public static final ConfigOption<Long> HEARTBEAT_INTERVAL =
-			key("heartbeat.sender.interval")
+			key("heartbeat.interval")
 			.defaultValue(10000L);
 
 	/** Timeout for requesting and receiving heartbeat for both sender and receiver sides */

http://git-wip-us.apache.org/repos/asf/flink/blob/97ccc147/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatServices.java
new file mode 100644
index 0000000..7d55b9c
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatServices.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.heartbeat;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HeartbeatManagerOptions;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.concurrent.ScheduledExecutor;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+
+/**
+ * HeartbeatServices gives access to all services needed for heartbeating. This includes the
+ * creation of heartbeat receivers and heartbeat senders.
+ */
+public class HeartbeatServices {
+
+	/** Heartbeat interval for the created services */
+	protected final long heartbeatInterval;
+
+	/** Heartbeat timeout for the created services */
+	protected final long heartbeatTimeout;
+
+	public HeartbeatServices(long heartbeatInterval, long heartbeatTimeout) {
+		Preconditions.checkArgument(0L < heartbeatInterval, "The heartbeat interval must be larger than 0.");
+		Preconditions.checkArgument(heartbeatInterval <= heartbeatTimeout, "The heartbeat timeout should be larger or equal than the heartbeat timeout.");
+
+		this.heartbeatInterval = heartbeatInterval;
+		this.heartbeatTimeout = heartbeatTimeout;
+	}
+
+	/**
+	 * Creates a heartbeat manager which does not actively send heartbeats.
+	 *
+	 * @param resourceId Resource Id which identifies the owner of the heartbeat manager
+	 * @param heartbeatListener Listener which will be notified upon heartbeat timeouts for registered
+	 *                          targets
+	 * @param scheduledExecutor Scheduled executor to be used for scheduling heartbeat timeouts
+	 * @param log Logger to be used for the logging
+	 * @param <I> Type of the incoming payload
+	 * @param <O> Type of the outgoing payload
+	 * @return A new HeartbeatManager instance
+	 */
+	public <I, O> HeartbeatManager<I, O> createHeartbeatManager(
+		ResourceID resourceId,
+		HeartbeatListener<I, O> heartbeatListener,
+		ScheduledExecutor scheduledExecutor,
+		Logger log) {
+
+		return new HeartbeatManagerImpl<>(
+			heartbeatTimeout,
+			resourceId,
+			heartbeatListener,
+			scheduledExecutor,
+			scheduledExecutor,
+			log);
+	}
+
+	/**
+	 * Creates a heartbeat manager which actively sends heartbeats to monitoring targets.
+	 *
+	 * @param resourceId Resource Id which identifies the owner of the heartbeat manager
+	 * @param heartbeatListener Listener which will be notified upon heartbeat timeouts for registered
+	 *                          targets
+	 * @param scheduledExecutor Scheduled executor to be used for scheduling heartbeat timeouts
+	 * @param log Logger to be used for the logging
+	 * @param <I> Type of the incoming payload
+	 * @param <O> Type of the outgoing payload
+	 * @return A new HeartbeatManager instance which actively sends heartbeats
+	 */
+	public <I, O> HeartbeatManager<I, O> createHeartbeatManagerSender(
+		ResourceID resourceId,
+		HeartbeatListener<I, O> heartbeatListener,
+		ScheduledExecutor scheduledExecutor,
+		Logger log) {
+
+		return new HeartbeatManagerSenderImpl<>(
+			heartbeatInterval,
+			heartbeatTimeout,
+			resourceId,
+			heartbeatListener,
+			scheduledExecutor,
+			scheduledExecutor,
+			log);
+	}
+
+	/**
+	 * Creates an HeartbeatServices instance from a {@link Configuration}.
+	 *
+	 * @param configuration Configuration to be used for the HeartbeatServices creation
+	 * @return An HeartbeatServices instance created from the given configuration
+	 */
+	public static HeartbeatServices fromConfiguration(Configuration configuration) {
+		long heartbeatInterval = configuration.getLong(HeartbeatManagerOptions.HEARTBEAT_INTERVAL);
+
+		long heartbeatTimeout = configuration.getLong(HeartbeatManagerOptions.HEARTBEAT_TIMEOUT);
+
+		return new HeartbeatServices(heartbeatInterval, heartbeatTimeout);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/97ccc147/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/TestingHeartbeatManagerImpl.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/TestingHeartbeatManagerImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/TestingHeartbeatManagerImpl.java
index 1238f1a..a6e056d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/TestingHeartbeatManagerImpl.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/TestingHeartbeatManagerImpl.java
@@ -40,11 +40,12 @@ public class TestingHeartbeatManagerImpl<I, O> extends HeartbeatManagerImpl<I, O
 			CountDownLatch waitLatch,
 			long heartbeatTimeoutIntervalMs,
 			ResourceID ownResourceID,
+			HeartbeatListener<I, O> heartbeatListener,
 			Executor executor,
 			ScheduledExecutor scheduledExecutor,
 			Logger log) {
 
-		super(heartbeatTimeoutIntervalMs, ownResourceID, executor, scheduledExecutor, log);
+		super(heartbeatTimeoutIntervalMs, ownResourceID, heartbeatListener, executor, scheduledExecutor, log);
 
 		this.waitLatch = waitLatch;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/97ccc147/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/TestingHeartbeatManagerSenderImpl.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/TestingHeartbeatManagerSenderImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/TestingHeartbeatManagerSenderImpl.java
index 7000895..36f7e96 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/TestingHeartbeatManagerSenderImpl.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/TestingHeartbeatManagerSenderImpl.java
@@ -38,11 +38,12 @@ public class TestingHeartbeatManagerSenderImpl<I, O> extends HeartbeatManagerSen
 			long heartbeatPeriod,
 			long heartbeatTimeout,
 			ResourceID ownResourceID,
+			HeartbeatListener<I, O> heartbeatListener,
 			Executor executor,
 			ScheduledExecutor scheduledExecutor,
 			Logger log) {
 
-		super(heartbeatPeriod, heartbeatTimeout, ownResourceID, executor, scheduledExecutor, log);
+		super(heartbeatPeriod, heartbeatTimeout, ownResourceID, heartbeatListener, executor, scheduledExecutor, log);
 
 		this.waitLatch = waitLatch;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/97ccc147/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
index eced869..33ee29d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
@@ -21,11 +21,10 @@ package org.apache.flink.runtime.jobmaster;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.HeartbeatManagerOptions;
 import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
-import org.apache.flink.runtime.heartbeat.HeartbeatManagerSenderImpl;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.highavailability.RunningJobsRegistry;
 import org.apache.flink.runtime.highavailability.RunningJobsRegistry.JobSchedulingStatus;
@@ -88,31 +87,47 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions, F
 	// ------------------------------------------------------------------------
 
 	public JobManagerRunner(
+			final ResourceID resourceId,
 			final JobGraph jobGraph,
 			final Configuration configuration,
 			final RpcService rpcService,
 			final HighAvailabilityServices haServices,
+			final HeartbeatServices heartbeatServices,
 			final OnCompletionActions toNotifyOnComplete,
-			final FatalErrorHandler errorHandler) throws Exception
-	{
-		this(jobGraph, configuration, rpcService, haServices,
-				new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(configuration)),
-				toNotifyOnComplete, errorHandler);
+			final FatalErrorHandler errorHandler) throws Exception {
+		this(
+			resourceId,
+			jobGraph,
+			configuration,
+			rpcService,
+			haServices,
+			heartbeatServices,
+			new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(configuration)),
+			toNotifyOnComplete,
+			errorHandler);
 	}
 
 	public JobManagerRunner(
+			final ResourceID resourceId,
 			final JobGraph jobGraph,
 			final Configuration configuration,
 			final RpcService rpcService,
 			final HighAvailabilityServices haServices,
+			final HeartbeatServices heartbeatServices,
 			final MetricRegistry metricRegistry,
 			final OnCompletionActions toNotifyOnComplete,
-			final FatalErrorHandler errorHandler) throws Exception
-	{
-		this(jobGraph, configuration, rpcService, haServices,
-				JobManagerServices.fromConfiguration(configuration, haServices),
-				metricRegistry,
-				toNotifyOnComplete, errorHandler);
+			final FatalErrorHandler errorHandler) throws Exception {
+		this(
+			resourceId,
+			jobGraph,
+			configuration,
+			rpcService,
+			haServices,
+			heartbeatServices,
+			JobManagerServices.fromConfiguration(configuration, haServices),
+			metricRegistry,
+			toNotifyOnComplete,
+			errorHandler);
 	}
 
 	/**
@@ -127,15 +142,16 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions, F
 	 *                   required services could not be started, ot the Job could not be initialized.
 	 */
 	public JobManagerRunner(
+			final ResourceID resourceId,
 			final JobGraph jobGraph,
 			final Configuration configuration,
 			final RpcService rpcService,
 			final HighAvailabilityServices haServices,
+			final HeartbeatServices heartbeatServices,
 			final JobManagerServices jobManagerServices,
 			final MetricRegistry metricRegistry,
 			final OnCompletionActions toNotifyOnComplete,
-			final FatalErrorHandler errorHandler) throws Exception
-	{
+			final FatalErrorHandler errorHandler) throws Exception {
 
 		JobManagerMetricGroup jobManagerMetrics = null;
 
@@ -170,31 +186,22 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions, F
 			this.runningJobsRegistry = haServices.getRunningJobsRegistry();
 			this.leaderElectionService = haServices.getJobManagerLeaderElectionService(jobGraph.getJobID());
 
-			// heartbeat manager last
-			final ResourceID resourceID = ResourceID.generate();
-			final HeartbeatManagerSenderImpl<Void, Void> jobManagerHeartbeatManager = new HeartbeatManagerSenderImpl<>(
-					configuration.getLong(HeartbeatManagerOptions.HEARTBEAT_INTERVAL),
-					configuration.getLong(HeartbeatManagerOptions.HEARTBEAT_TIMEOUT),
-					resourceID,
-					rpcService.getExecutor(),
-					rpcService.getScheduledExecutor(),
-					log);
-
 			// now start the JobManager
 			this.jobManager = new JobMaster(
-					jobGraph, configuration,
-					rpcService,
-					haServices,
-					jobManagerServices.executorService,
-					jobManagerServices.libraryCacheManager,
-					jobManagerServices.restartStrategyFactory,
-					jobManagerServices.rpcAskTimeout,
-					jobManagerMetrics,
-					resourceID,
-					jobManagerHeartbeatManager,
-					this,
-					this,
-					userCodeLoader);
+				resourceId,
+				jobGraph,
+				configuration,
+				rpcService,
+				haServices,
+				heartbeatServices,
+				jobManagerServices.executorService,
+				jobManagerServices.libraryCacheManager,
+				jobManagerServices.restartStrategyFactory,
+				jobManagerServices.rpcAskTimeout,
+				jobManagerMetrics,
+				this,
+				this,
+				userCodeLoader);
 		}
 		catch (Throwable t) {
 			// clean up everything

http://git-wip-us.apache.org/repos/asf/flink/blob/97ccc147/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index 16c243c..243b57f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -51,7 +51,8 @@ import org.apache.flink.runtime.executiongraph.JobStatusListener;
 import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
 import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory;
 import org.apache.flink.runtime.heartbeat.HeartbeatListener;
-import org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl;
+import org.apache.flink.runtime.heartbeat.HeartbeatManager;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.heartbeat.HeartbeatTarget;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.highavailability.LeaderIdMismatchException;
@@ -105,8 +106,8 @@ import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.Callable;
 import java.util.concurrent.Executor;
-import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -129,6 +130,8 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 
 	// ------------------------------------------------------------------------
 
+	private final ResourceID resourceId;
+
 	/** Logical representation of the job */
 	private final JobGraph jobGraph;
 
@@ -150,10 +153,10 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 	private final MetricGroup jobMetricGroup;
 
 	/** The heartbeat manager with task managers */
-	private final HeartbeatManagerImpl<Void, Void> heartbeatManager;
+	private final HeartbeatManager<Void, Void> heartbeatManager;
 
 	/** The execution context which is used to execute futures */
-	private final ExecutorService executionContext;
+	private final Executor executor;
 
 	private final OnCompletionActions jobCompletionActions;
 
@@ -170,8 +173,6 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 
 	private volatile UUID leaderSessionID;
 
-	private final ResourceID resourceID;
-
 	// --------- ResourceManager --------
 
 	/** Leader retriever service used to locate ResourceManager's address */
@@ -187,34 +188,38 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 	// ------------------------------------------------------------------------
 
 	public JobMaster(
+			ResourceID resourceId,
 			JobGraph jobGraph,
 			Configuration configuration,
 			RpcService rpcService,
 			HighAvailabilityServices highAvailabilityService,
-			ScheduledExecutorService executorService,
+			HeartbeatServices heartbeatServices,
+			ScheduledExecutorService executor,
 			BlobLibraryCacheManager libraryCacheManager,
 			RestartStrategyFactory restartStrategyFactory,
 			Time rpcAskTimeout,
 			@Nullable JobManagerMetricGroup jobManagerMetricGroup,
-			ResourceID resourceID,
-			HeartbeatManagerImpl<Void, Void> heartbeatManager,
 			OnCompletionActions jobCompletionActions,
 			FatalErrorHandler errorHandler,
-			ClassLoader userCodeLoader) throws Exception
-	{
+			ClassLoader userCodeLoader) throws Exception {
 		super(rpcService);
 
+		this.resourceId = checkNotNull(resourceId);
 		this.jobGraph = checkNotNull(jobGraph);
 		this.configuration = checkNotNull(configuration);
 		this.rpcTimeout = rpcAskTimeout;
 		this.highAvailabilityServices = checkNotNull(highAvailabilityService);
 		this.libraryCacheManager = checkNotNull(libraryCacheManager);
-		this.executionContext = checkNotNull(executorService);
+		this.executor = checkNotNull(executor);
 		this.jobCompletionActions = checkNotNull(jobCompletionActions);
 		this.errorHandler = checkNotNull(errorHandler);
 		this.userCodeLoader = checkNotNull(userCodeLoader);
-		this.resourceID = checkNotNull(resourceID);
-		this.heartbeatManager = checkNotNull(heartbeatManager);
+
+		this.heartbeatManager = heartbeatServices.createHeartbeatManagerSender(
+			resourceId,
+			new TaskManagerHeartbeatListener(),
+			rpcService.getScheduledExecutor(),
+			log);
 
 		final String jobName = jobGraph.getName();
 		final JobID jid = jobGraph.getJobID();
@@ -251,8 +256,8 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 			null,
 			jobGraph,
 			configuration,
-			executorService,
-			executorService,
+			executor,
+			executor,
 			slotPool.getSlotProvider(),
 			userCodeLoader,
 			checkpointRecoveryFactory,
@@ -288,27 +293,6 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 			// make sure we receive RPC and async calls
 			super.start();
 
-			heartbeatManager.start(new HeartbeatListener<Void, Void>() {
-				@Override
-				public void notifyHeartbeatTimeout(ResourceID resourceID) {
-					log.info("Notify heartbeat timeout with task manager {}", resourceID);
-					heartbeatManager.unmonitorTarget(resourceID);
-
-					getSelf().disconnectTaskManager(resourceID);
-				}
-
-				@Override
-				public void reportPayload(ResourceID resourceID, Void payload) {
-					// currently there is no payload from task manager and resource manager, so this method will not be called.
-				}
-
-				@Override
-				public Future<Void> retrievePayload() {
-					// currently no need payload.
-					return null;
-				}
-			});
-
 			log.info("JobManager started as leader {} for job {}", leaderSessionID, jobGraph.getJobID());
 			getSelf().startJobExecution();
 		}
@@ -322,8 +306,9 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 	 */
 	@Override
 	public void shutDown() throws Exception {
-		// make sure there is a graceful exit
 		heartbeatManager.stop();
+
+		// make sure there is a graceful exit
 		getSelf().suspendExecution(new Exception("JobManager is shutting down."));
 		super.shutDown();
 	}
@@ -371,7 +356,7 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 		}
 
 		// start scheduling job in another thread
-		executionContext.execute(new Runnable() {
+		executor.execute(new Runnable() {
 			@Override
 			public void run() {
 				try {
@@ -545,9 +530,16 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 	}
 
 	@RpcMethod
-	public void disconnectTaskManager(final ResourceID resourceID) {
-		registeredTaskManagers.remove(resourceID);
+	public void disconnectTaskManager(final ResourceID resourceID, final Exception cause) {
+		heartbeatManager.unmonitorTarget(resourceID);
 		slotPoolGateway.releaseTaskManager(resourceID);
+
+		Tuple2<TaskManagerLocation, TaskExecutorGateway> taskManagerConnection = registeredTaskManagers.remove(resourceID);
+
+		if (taskManagerConnection != null) {
+			taskManagerConnection.f1.disconnectJobManager(jobGraph.getJobID(), cause);
+		}
+
 	}
 
 	// TODO: This method needs a leader session ID
@@ -743,7 +735,7 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 
 		if (registeredTaskManagers.containsKey(taskManagerId)) {
 			final RegistrationResponse response = new JMTMRegistrationSuccess(
-					resourceID, libraryCacheManager.getBlobServerPort());
+				resourceId, libraryCacheManager.getBlobServerPort());
 			return FlinkCompletableFuture.completed(response);
 		} else {
 			return getRpcService().execute(new Callable<TaskExecutorGateway>() {
@@ -773,7 +765,7 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 					// monitor the task manager as heartbeat target
 					heartbeatManager.monitorTarget(taskManagerId, new HeartbeatTarget<Void>() {
 						@Override
-						public void sendHeartbeat(ResourceID resourceID, Void payload) {
+						public void receiveHeartbeat(ResourceID resourceID, Void payload) {
 							// the task manager will not request heartbeat, so this method will never be called currently
 						}
 
@@ -783,7 +775,7 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 						}
 					});
 
-					return new JMTMRegistrationSuccess(resourceID, libraryCacheManager.getBlobServerPort());
+					return new JMTMRegistrationSuccess(resourceId, libraryCacheManager.getBlobServerPort());
 				}
 			}, getMainThreadExecutor());
 		}
@@ -799,7 +791,7 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 
 	@RpcMethod
 	public void heartbeatFromTaskManager(final ResourceID resourceID) {
-		heartbeatManager.sendHeartbeat(resourceID, null);
+		heartbeatManager.receiveHeartbeat(resourceID, null);
 	}
 
 	//----------------------------------------------------------------------------------------------
@@ -903,7 +895,7 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 			log.info("Attempting to register at ResourceManager {}", resourceManagerAddress);
 			resourceManagerConnection = new ResourceManagerConnection(
 					log, jobGraph.getJobID(), getAddress(), leaderSessionID,
-					resourceManagerAddress, resourceManagerLeaderId, executionContext);
+					resourceManagerAddress, resourceManagerLeaderId, executor);
 			resourceManagerConnection.start();
 		}
 	}
@@ -1046,4 +1038,26 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 			});
 		}
 	}
+
+	private class TaskManagerHeartbeatListener implements HeartbeatListener<Void, Void> {
+
+		@Override
+		public void notifyHeartbeatTimeout(ResourceID resourceID) {
+			log.info("Task manager with id {} timed out.", resourceID);
+
+			getSelf().disconnectTaskManager(
+				resourceID,
+				new TimeoutException("The heartbeat of TaskManager with id " + resourceID + " timed out."));
+		}
+
+		@Override
+		public void reportPayload(ResourceID resourceID, Void payload) {
+			// nothing to do since there is no payload
+		}
+
+		@Override
+		public Future<Void> retrievePayload() {
+			return FlinkCompletableFuture.completed(null);
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/97ccc147/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
index e7e3111..13a7372 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
@@ -124,8 +124,9 @@ public interface JobMasterGateway extends CheckpointCoordinatorGateway {
 	 * {@link JobMaster}.
 	 *
 	 * @param resourceID identifying the TaskManager to disconnect
+	 * @param cause for the disconnection of the TaskManager
 	 */
-	void disconnectTaskManager(ResourceID resourceID);
+	void disconnectTaskManager(ResourceID resourceID, Exception cause);
 
 	/**
 	 * Disconnects the resource manager from the job manager because of the given cause.

http://git-wip-us.apache.org/repos/asf/flink/blob/97ccc147/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
index 1933554..25c4aba 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
@@ -28,6 +28,7 @@ import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
 import org.apache.flink.runtime.jobgraph.JobGraph;
@@ -82,6 +83,9 @@ public class MiniCluster {
 	private HighAvailabilityServices haServices;
 
 	@GuardedBy("lock")
+	private HeartbeatServices heartbeatServices;
+
+	@GuardedBy("lock")
 	private ResourceManagerRunner[] resourceManagerRunners;
 
 	@GuardedBy("lock")
@@ -232,6 +236,8 @@ public class MiniCluster {
 				LOG.info("Starting high-availability services");
 				haServices = HighAvailabilityServicesUtils.createAvailableOrEmbeddedServices(configuration);
 
+				heartbeatServices = HeartbeatServices.fromConfiguration(configuration);
+
 				// bring up the ResourceManager(s)
 				LOG.info("Starting {} ResourceManger(s)", numResourceManagers);
 				resourceManagerRunners = startResourceManagers(
@@ -245,7 +251,12 @@ public class MiniCluster {
 				// bring up the dispatcher that launches JobManagers when jobs submitted
 				LOG.info("Starting job dispatcher(s) for {} JobManger(s)", numJobManagers);
 				jobDispatcher = new MiniClusterJobDispatcher(
-						configuration, haServices, metricRegistry, numJobManagers, jobManagerRpcServices);
+					configuration,
+					haServices,
+					heartbeatServices,
+					metricRegistry,
+					numJobManagers,
+					jobManagerRpcServices);
 			}
 			catch (Exception e) {
 				// cleanup everything
@@ -533,6 +544,7 @@ public class MiniCluster {
 				new ResourceID(UUID.randomUUID().toString()),
 				taskManagerRpcServices[i],
 				haServices,
+				heartbeatServices,
 				metricRegistry,
 				localCommunication);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/97ccc147/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java
index dd80ada..1f8ae80 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java
@@ -22,6 +22,8 @@ import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobmanager.OnCompletionActions;
@@ -63,6 +65,9 @@ public class MiniClusterJobDispatcher {
 	/** services for discovery, leader election, and recovery */
 	private final HighAvailabilityServices haServices;
 
+	/** services for heartbeating */
+	private final HeartbeatServices heartbeatServices;
+
 	/** all the services that the JobManager needs, such as BLOB service, factories, etc */
 	private final JobManagerServices jobManagerServices;
 
@@ -94,8 +99,9 @@ public class MiniClusterJobDispatcher {
 			Configuration config,
 			RpcService rpcService,
 			HighAvailabilityServices haServices,
+			HeartbeatServices heartbeatServices,
 			MetricRegistry metricRegistry) throws Exception {
-		this(config, haServices, metricRegistry, 1, new RpcService[] { rpcService });
+		this(config, haServices, heartbeatServices, metricRegistry, 1, new RpcService[] { rpcService });
 	}
 
 	/**
@@ -113,6 +119,7 @@ public class MiniClusterJobDispatcher {
 	public MiniClusterJobDispatcher(
 			Configuration config,
 			HighAvailabilityServices haServices,
+			HeartbeatServices heartbeatServices,
 			MetricRegistry metricRegistry,
 			int numJobManagers,
 			RpcService[] rpcServices) throws Exception {
@@ -123,6 +130,7 @@ public class MiniClusterJobDispatcher {
 		this.configuration = checkNotNull(config);
 		this.rpcServices = rpcServices;
 		this.haServices = checkNotNull(haServices);
+		this.heartbeatServices = checkNotNull(heartbeatServices);
 		this.metricRegistry = checkNotNull(metricRegistry);
 		this.numJobManagers = numJobManagers;
 
@@ -232,9 +240,17 @@ public class MiniClusterJobDispatcher {
 		JobManagerRunner[] runners = new JobManagerRunner[numJobManagers];
 		for (int i = 0; i < numJobManagers; i++) {
 			try {
-				runners[i] = new JobManagerRunner(job, configuration,
-						rpcServices[i], haServices, jobManagerServices, metricRegistry, 
-						onCompletion, errorHandler);
+				runners[i] = new JobManagerRunner(
+					ResourceID.generate(),
+					job,
+					configuration,
+					rpcServices[i],
+					haServices,
+					heartbeatServices,
+					jobManagerServices,
+					metricRegistry,
+					onCompletion,
+					errorHandler);
 				runners[i].start();
 			}
 			catch (Throwable t) {

http://git-wip-us.apache.org/repos/asf/flink/blob/97ccc147/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index e601b0b..00a1bf8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -30,6 +30,7 @@ import org.apache.flink.runtime.clusterframework.types.SlotID;
 import org.apache.flink.runtime.concurrent.AcceptFunction;
 import org.apache.flink.runtime.concurrent.ApplyFunction;
 import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
 import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
@@ -39,7 +40,8 @@ import org.apache.flink.runtime.executiongraph.PartitionInfo;
 import org.apache.flink.runtime.executiongraph.TaskInformation;
 import org.apache.flink.runtime.filecache.FileCache;
 import org.apache.flink.runtime.heartbeat.HeartbeatListener;
-import org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl;
+import org.apache.flink.runtime.heartbeat.HeartbeatManager;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.heartbeat.HeartbeatTarget;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
@@ -131,7 +133,7 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
 	private final MetricRegistry metricRegistry;
 
 	/** The heartbeat manager for job manager in the task manager */
-	private final HeartbeatManagerImpl<Void, Void> jobManagerHeartbeatManager;
+	private final HeartbeatManager<Void, Void> jobManagerHeartbeatManager;
 
 	/** The fatal error handler to use in case of a fatal error */
 	private final FatalErrorHandler fatalErrorHandler;
@@ -168,8 +170,8 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
 		IOManager ioManager,
 		NetworkEnvironment networkEnvironment,
 		HighAvailabilityServices haServices,
+		HeartbeatServices heartbeatServices,
 		MetricRegistry metricRegistry,
-		HeartbeatManagerImpl<Void, Void> jobManagerHeartbeatManager,
 		TaskManagerMetricGroup taskManagerMetricGroup,
 		BroadcastVariableManager broadcastVariableManager,
 		FileCache fileCache,
@@ -189,7 +191,6 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
 		this.networkEnvironment = checkNotNull(networkEnvironment);
 		this.haServices = checkNotNull(haServices);
 		this.metricRegistry = checkNotNull(metricRegistry);
-		this.jobManagerHeartbeatManager = checkNotNull(jobManagerHeartbeatManager);
 		this.taskSlotTable = checkNotNull(taskSlotTable);
 		this.fatalErrorHandler = checkNotNull(fatalErrorHandler);
 		this.taskManagerMetricGroup = checkNotNull(taskManagerMetricGroup);
@@ -199,6 +200,12 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
 		this.jobLeaderService = checkNotNull(jobLeaderService);
 
 		this.jobManagerConnections = new HashMap<>(4);
+
+		this.jobManagerHeartbeatManager = heartbeatServices.createHeartbeatManager(
+			getResourceID(),
+			new JobManagerHeartbeatListener(),
+			rpcService.getScheduledExecutor(),
+			log);
 	}
 
 	// ------------------------------------------------------------------------
@@ -221,38 +228,6 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
 
 		// start the job leader service
 		jobLeaderService.start(getAddress(), getRpcService(), haServices, new JobLeaderListenerImpl());
-
-		// start the heartbeat manager for monitoring job manager
-		jobManagerHeartbeatManager.start(new HeartbeatListener<Void, Void>() {
-			@Override
-			public void notifyHeartbeatTimeout(final ResourceID resourceID) {
-				runAsync(new Runnable() {
-					@Override
-					public void run() {
-						log.info("Notify heartbeat timeout with job manager {}", resourceID);
-						jobManagerHeartbeatManager.unmonitorTarget(resourceID);
-
-						if (jobManagerConnections.containsKey(resourceID)) {
-							JobManagerConnection jobManagerConnection = jobManagerConnections.get(resourceID);
-							if (jobManagerConnection != null) {
-								closeJobManagerConnection(jobManagerConnection.getJobID());
-							}
-						}
-					}
-				});
-			}
-
-			@Override
-			public void reportPayload(ResourceID resourceID, Void payload) {
-				// currently there is no payload from job manager, so this method will not be called.
-			}
-
-			@Override
-			public Future<Void> retrievePayload() {
-				// currently no need payload.
-				return null;
-			}
-		});
 	}
 
 	/**
@@ -644,6 +619,11 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
 		return new TMSlotRequestRegistered(resourceManagerConnection.getRegistrationId(), getResourceID(), allocationId);
 	}
 
+	@RpcMethod
+	public void disconnectJobManager(JobID jobId, Exception cause) {
+		closeJobManagerConnection(jobId, cause);
+	}
+
 	// ======================================================================
 	//  Internal methods
 	// ======================================================================
@@ -786,7 +766,7 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
 		if (jobManagerTable.contains(jobId)) {
 			JobManagerConnection oldJobManagerConnection = jobManagerTable.get(jobId);
 			if (!oldJobManagerConnection.getLeaderId().equals(jobManagerLeaderId)) {
-				closeJobManagerConnection(jobId);
+				closeJobManagerConnection(jobId, new Exception("Found new job leader for job id " + jobId + '.'));
 			}
 		}
 
@@ -803,20 +783,20 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
 		// monitor the job manager as heartbeat target
 		jobManagerHeartbeatManager.monitorTarget(jobManagerResourceID, new HeartbeatTarget<Void>() {
 			@Override
-			public void sendHeartbeat(ResourceID resourceID, Void payload) {
+			public void receiveHeartbeat(ResourceID resourceID, Void payload) {
 				jobMasterGateway.heartbeatFromTaskManager(resourceID);
 			}
 
 			@Override
 			public void requestHeartbeat(ResourceID resourceID, Void payload) {
-				// request heartbeat will never be called in task manager side
+				// request heartbeat will never be called on the task manager side
 			}
 		});
 
 		offerSlotsToJobManager(jobId);
 	}
 
-	private void closeJobManagerConnection(JobID jobId) {
+	private void closeJobManagerConnection(JobID jobId, Exception cause) {
 		log.info("Close JobManager connection for job {}.", jobId);
 
 		// 1. fail tasks running under this JobID
@@ -847,8 +827,10 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
 
 		if (jobManagerConnection != null) {
 			try {
+				jobManagerHeartbeatManager.unmonitorTarget(jobManagerConnection.getResourceID());
+
 				jobManagerConnections.remove(jobManagerConnection.getResourceID());
-				disassociateFromJobManager(jobManagerConnection);
+				disassociateFromJobManager(jobManagerConnection, cause);
 			} catch (IOException e) {
 				log.warn("Could not properly disassociate from JobManager {}.",
 					jobManagerConnection.getJobManagerGateway().getAddress(), e);
@@ -909,10 +891,10 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
 			partitionStateChecker);
 	}
 
-	private void disassociateFromJobManager(JobManagerConnection jobManagerConnection) throws IOException {
+	private void disassociateFromJobManager(JobManagerConnection jobManagerConnection, Exception cause) throws IOException {
 		Preconditions.checkNotNull(jobManagerConnection);
 		JobMasterGateway jobManagerGateway = jobManagerConnection.getJobManagerGateway();
-		jobManagerGateway.disconnectTaskManager(getResourceID());
+		jobManagerGateway.disconnectTaskManager(getResourceID(), cause);
 		jobManagerConnection.getLibraryCacheManager().shutdown();
 	}
 
@@ -1138,7 +1120,9 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
 			runAsync(new Runnable() {
 				@Override
 				public void run() {
-					closeJobManagerConnection(jobId);
+					closeJobManagerConnection(
+						jobId,
+						new Exception("Job leader for job id " + jobId + " lost leadership."));
 				}
 			});
 		}
@@ -1220,4 +1204,37 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
 			});
 		}
 	}
+
+	private class JobManagerHeartbeatListener implements HeartbeatListener<Void, Void> {
+
+		@Override
+		public void notifyHeartbeatTimeout(final ResourceID resourceID) {
+			runAsync(new Runnable() {
+				@Override
+				public void run() {
+					log.info("The JobManager connection {} has timed out.", resourceID);
+
+					if (jobManagerConnections.containsKey(resourceID)) {
+						JobManagerConnection jobManagerConnection = jobManagerConnections.get(resourceID);
+						if (jobManagerConnection != null) {
+							closeJobManagerConnection(
+								jobManagerConnection.getJobID(),
+								new TimeoutException("The heartbeat of JobManager with id " +
+									resourceID + " timed out."));
+						}
+					}
+				}
+			});
+		}
+
+		@Override
+		public void reportPayload(ResourceID resourceID, Void payload) {
+			// nothing to do since the payload is of type Void
+		}
+
+		@Override
+		public Future<Void> retrievePayload() {
+			return FlinkCompletableFuture.completed(null);
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/97ccc147/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
index 95db932..2dcc3a4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
@@ -134,9 +134,17 @@ public interface TaskExecutorGateway extends RpcGateway {
 	Future<Acknowledge> cancelTask(ExecutionAttemptID executionAttemptID, @RpcTimeout Time timeout);
 
 	/**
-	 * Request heartbeat from the job manager
+	 * Heartbeat request from the job manager
 	 *
-	 * @param resourceID unique id of the job manager
+	 * @param heartbeatOrigin unique id of the job manager
 	 */
-	void heartbeatFromJobManager(ResourceID resourceID);
+	void heartbeatFromJobManager(ResourceID heartbeatOrigin);
+
+	/**
+	 * Disconnects the given JobManager from the TaskManager.
+	 *
+	 * @param jobId JobID for which the JobManager was the leader
+	 * @param cause for the disconnection from the JobManager
+	 */
+	void disconnectJobManager(JobID jobId, Exception cause);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/97ccc147/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
index 402421c..c99eb91 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
@@ -21,11 +21,10 @@ package org.apache.flink.runtime.taskexecutor;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.HeartbeatManagerOptions;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.concurrent.Future;
-import org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
@@ -73,18 +72,27 @@ public class TaskManagerRunner implements FatalErrorHandler {
 			ResourceID resourceID,
 			RpcService rpcService,
 			HighAvailabilityServices highAvailabilityServices,
+			HeartbeatServices heartbeatServices,
 			MetricRegistry metricRegistry) throws Exception {
 
-		this(configuration, resourceID, rpcService, highAvailabilityServices, metricRegistry, false);
+		this(
+			configuration,
+			resourceID,
+			rpcService,
+			highAvailabilityServices,
+			heartbeatServices,
+			metricRegistry,
+			false);
 	}
 
 	public TaskManagerRunner(
-		Configuration configuration,
-		ResourceID resourceID,
-		RpcService rpcService,
-		HighAvailabilityServices highAvailabilityServices,
-		MetricRegistry metricRegistry,
-		boolean localCommunicationOnly) throws Exception {
+			Configuration configuration,
+			ResourceID resourceID,
+			RpcService rpcService,
+			HighAvailabilityServices highAvailabilityServices,
+			HeartbeatServices heartbeatServices,
+			MetricRegistry metricRegistry,
+			boolean localCommunicationOnly) throws Exception {
 
 		this.configuration = Preconditions.checkNotNull(configuration);
 		this.resourceID = Preconditions.checkNotNull(resourceID);
@@ -114,13 +122,6 @@ public class TaskManagerRunner implements FatalErrorHandler {
 		// Initialize the TM metrics
 		TaskExecutorMetricsInitializer.instantiateStatusMetrics(taskManagerMetricGroup, taskManagerServices.getNetworkEnvironment());
 
-		HeartbeatManagerImpl<Void, Void> heartbeatManager = new HeartbeatManagerImpl<>(
-				configuration.getLong(HeartbeatManagerOptions.HEARTBEAT_TIMEOUT),
-				resourceID,
-				executor,
-				rpcService.getScheduledExecutor(),
-				LOG);
-
 		this.taskManager = new TaskExecutor(
 			taskManagerConfiguration,
 			taskManagerServices.getTaskManagerLocation(),
@@ -129,8 +130,8 @@ public class TaskManagerRunner implements FatalErrorHandler {
 			taskManagerServices.getIOManager(),
 			taskManagerServices.getNetworkEnvironment(),
 			highAvailabilityServices,
+			heartbeatServices,
 			metricRegistry,
-			heartbeatManager,
 			taskManagerMetricGroup,
 			taskManagerServices.getBroadcastVariableManager(),
 			taskManagerServices.getFileCache(),

http://git-wip-us.apache.org/repos/asf/flink/blob/97ccc147/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerMockTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerMockTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerMockTest.java
index 1a9818e..d2221c5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerMockTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerMockTest.java
@@ -22,6 +22,8 @@ import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.blob.BlobStore;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.highavailability.RunningJobsRegistry;
 import org.apache.flink.runtime.jobgraph.JobGraph;
@@ -99,11 +101,15 @@ public class JobManagerRunnerMockTest {
 		when(haServices.createBlobStore()).thenReturn(blobStore);
 		when(haServices.getRunningJobsRegistry()).thenReturn(runningJobsRegistry);
 
+		HeartbeatServices heartbeatServices = mock(HeartbeatServices.class);
+
 		runner = PowerMockito.spy(new JobManagerRunner(
+			ResourceID.generate(),
 			new JobGraph("test", new JobVertex("vertex")),
 			mock(Configuration.class),
 			mockRpc,
 			haServices,
+			heartbeatServices,
 			JobManagerServices.fromConfiguration(new Configuration(), haServices),
 			new MetricRegistry(MetricRegistryConfiguration.defaultMetricRegistryConfiguration()),
 			jobCompletion,

http://git-wip-us.apache.org/repos/asf/flink/blob/97ccc147/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
index cdad87f..567a8fc 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
@@ -19,17 +19,18 @@
 package org.apache.flink.runtime.jobmaster;
 
 import org.apache.flink.api.common.time.Time;
-import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.concurrent.ScheduledExecutor;
 import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
 import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoader;
 import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory;
+import org.apache.flink.runtime.heartbeat.HeartbeatListener;
+import org.apache.flink.runtime.heartbeat.HeartbeatManager;
 import org.apache.flink.runtime.heartbeat.HeartbeatManagerSenderImpl;
-import org.apache.flink.runtime.heartbeat.TestingHeartbeatManagerSenderImpl;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
-import org.apache.flink.runtime.instance.SlotPoolGateway;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobmanager.OnCompletionActions;
 import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
@@ -37,24 +38,22 @@ import org.apache.flink.runtime.rpc.TestingSerialRpcService;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.util.TestingFatalErrorHandler;
+import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.TestLogger;
 import org.junit.Test;
 import org.junit.runner.RunWith;
+import org.mockito.ArgumentCaptor;
 import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
-import org.powermock.reflect.Whitebox;
+import org.slf4j.Logger;
 
 import java.net.InetAddress;
 import java.net.URL;
-import java.util.Map;
 import java.util.UUID;
-import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeoutException;
 
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.*;
 
@@ -84,32 +83,28 @@ public class JobMasterTest extends TestLogger {
 
 		final long heartbeatInterval = 1L;
 		final long heartbeatTimeout = 5L;
-		final CountDownLatch waitLatch = new CountDownLatch(1);
-		final HeartbeatManagerSenderImpl<Void, Void> jmHeartbeatManager = new TestingHeartbeatManagerSenderImpl<>(
-				waitLatch,
-				heartbeatInterval,
-				heartbeatTimeout,
-				jmResourceId,
-				rpc.getExecutor(),
-				rpc.getScheduledExecutor(),
-				log);
+
+		final ScheduledExecutor scheduledExecutor = mock(ScheduledExecutor.class);
+		final HeartbeatServices heartbeatServices = new TestingHeartbeatServices(heartbeatInterval, heartbeatTimeout, scheduledExecutor);
+
+		final JobGraph jobGraph = new JobGraph();
 
 		try {
 			final JobMaster jobMaster = new JobMaster(
-					new JobGraph(),
-					new Configuration(),
-					rpc,
-					haServices,
-					Executors.newScheduledThreadPool(1),
-					mock(BlobLibraryCacheManager.class),
-					mock(RestartStrategyFactory.class),
-					Time.of(10, TimeUnit.SECONDS),
-					null,
-					jmResourceId,
-					jmHeartbeatManager,
-					mock(OnCompletionActions.class),
-					testingFatalErrorHandler,
-					new FlinkUserCodeClassLoader(new URL[0]));
+				jmResourceId,
+				jobGraph,
+				new Configuration(),
+				rpc,
+				haServices,
+				heartbeatServices,
+				Executors.newScheduledThreadPool(1),
+				mock(BlobLibraryCacheManager.class),
+				mock(RestartStrategyFactory.class),
+				Time.of(10, TimeUnit.SECONDS),
+				null,
+				mock(OnCompletionActions.class),
+				testingFatalErrorHandler,
+				new FlinkUserCodeClassLoader(new URL[0]));
 
 			// also start the heartbeat manager in job manager
 			jobMaster.start(jmLeaderId);
@@ -117,24 +112,29 @@ public class JobMasterTest extends TestLogger {
 			// register task manager will trigger monitoring heartbeat target, schedule heartbeat request in interval time
 			jobMaster.registerTaskManager(taskManagerAddress, taskManagerLocation, jmLeaderId);
 
-			verify(taskExecutorGateway, atLeast(1)).heartbeatFromJobManager(eq(jmResourceId));
+			ArgumentCaptor<Runnable> heartbeatRunnableCaptor = ArgumentCaptor.forClass(Runnable.class);
+			verify(scheduledExecutor, times(1)).scheduleAtFixedRate(
+				heartbeatRunnableCaptor.capture(),
+				eq(0L),
+				eq(heartbeatInterval),
+				eq(TimeUnit.MILLISECONDS));
+
+			Runnable heartbeatRunnable = heartbeatRunnableCaptor.getValue();
+
+			ArgumentCaptor<Runnable> timeoutRunnableCaptor = ArgumentCaptor.forClass(Runnable.class);
+			verify(scheduledExecutor).schedule(timeoutRunnableCaptor.capture(), eq(heartbeatTimeout), eq(TimeUnit.MILLISECONDS));
 
-			final ConcurrentHashMap<ResourceID, Object> heartbeatTargets = Whitebox.getInternalState(jmHeartbeatManager, "heartbeatTargets");
-			final Map<ResourceID, Tuple2<TaskManagerLocation, TaskExecutorGateway>> registeredTMsInJM = Whitebox.getInternalState(jobMaster, "registeredTaskManagers");
-			final SlotPoolGateway slotPoolGateway = mock(SlotPoolGateway.class);
-			Whitebox.setInternalState(jobMaster, "slotPoolGateway", slotPoolGateway);
+			Runnable timeoutRunnable = timeoutRunnableCaptor.getValue();
 
-			// before heartbeat timeout
-			assertTrue(heartbeatTargets.containsKey(tmResourceId));
-			assertTrue(registeredTMsInJM.containsKey(tmResourceId));
+			// run the first heartbeat request
+			heartbeatRunnable.run();
 
-			// continue to unmonitor heartbeat target
-			waitLatch.countDown();
+			verify(taskExecutorGateway, times(1)).heartbeatFromJobManager(eq(jmResourceId));
 
-			// after heartbeat timeout
-			verify(slotPoolGateway, timeout(heartbeatTimeout * 5)).releaseTaskManager(eq(tmResourceId));
-			assertFalse(heartbeatTargets.containsKey(tmResourceId));
-			assertFalse(registeredTMsInJM.containsKey(tmResourceId));
+			// run the timeout runnable to simulate a heartbeat timeout
+			timeoutRunnable.run();
+
+			verify(taskExecutorGateway).disconnectJobManager(eq(jobGraph.getJobID()), any(TimeoutException.class));
 
 			// check if a concurrent error occurred
 			testingFatalErrorHandler.rethrowError();
@@ -143,4 +143,32 @@ public class JobMasterTest extends TestLogger {
 			rpc.stopService();
 		}
 	}
+
+	private static class TestingHeartbeatServices extends HeartbeatServices {
+
+		private final ScheduledExecutor scheduledExecutorToUse;
+
+		public TestingHeartbeatServices(long heartbeatInterval, long heartbeatTimeout, ScheduledExecutor scheduledExecutorToUse) {
+			super(heartbeatInterval, heartbeatTimeout);
+
+			this.scheduledExecutorToUse = Preconditions.checkNotNull(scheduledExecutorToUse);
+		}
+
+		@Override
+		public <I, O> HeartbeatManager<I, O> createHeartbeatManagerSender(
+			ResourceID resourceId,
+			HeartbeatListener<I, O> heartbeatListener,
+			ScheduledExecutor scheduledExecutor,
+			Logger log) {
+
+			return new HeartbeatManagerSenderImpl<>(
+				heartbeatInterval,
+				heartbeatTimeout,
+				resourceId,
+				heartbeatListener,
+				org.apache.flink.runtime.concurrent.Executors.directExecutor(),
+				scheduledExecutorToUse,
+				log);
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/97ccc147/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
index 5ffc97e..16edbf7 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
@@ -28,7 +28,7 @@ import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
 import org.apache.flink.runtime.filecache.FileCache;
-import org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.network.NetworkEnvironment;
@@ -67,6 +67,7 @@ import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.argThat;
 import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.RETURNS_MOCKS;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
@@ -104,7 +105,7 @@ public class TaskExecutorITCase {
 			rpcService.getScheduledExecutor(),
 			resourceManagerConfiguration.getJobTimeout());
 		MetricRegistry metricRegistry = mock(MetricRegistry.class);
-		HeartbeatManagerImpl heartbeatManager = mock(HeartbeatManagerImpl.class);
+		HeartbeatServices heartbeatServices = mock(HeartbeatServices.class, RETURNS_MOCKS);
 
 		final TaskManagerConfiguration taskManagerConfiguration = TaskManagerConfiguration.fromConfiguration(configuration);
 		final TaskManagerLocation taskManagerLocation = new TaskManagerLocation(taskManagerResourceId, InetAddress.getLocalHost(), 1234);
@@ -135,8 +136,8 @@ public class TaskExecutorITCase {
 			ioManager,
 			networkEnvironment,
 			testingHAServices,
+			heartbeatServices,
 			metricRegistry,
-			heartbeatManager,
 			taskManagerMetricGroup,
 			broadcastVariableManager,
 			fileCache,

http://git-wip-us.apache.org/repos/asf/flink/blob/97ccc147/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
index f500246..0f5bad3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
@@ -30,6 +30,7 @@ import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.clusterframework.types.SlotID;
 import org.apache.flink.runtime.concurrent.CompletableFuture;
 import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.concurrent.ScheduledExecutor;
 import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
 import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
@@ -39,8 +40,9 @@ import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.executiongraph.JobInformation;
 import org.apache.flink.runtime.executiongraph.TaskInformation;
 import org.apache.flink.runtime.filecache.FileCache;
+import org.apache.flink.runtime.heartbeat.HeartbeatListener;
 import org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl;
-import org.apache.flink.runtime.heartbeat.TestingHeartbeatManagerImpl;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.highavailability.NonHaServices;
 import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
@@ -82,16 +84,17 @@ import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TestName;
 import org.mockito.Matchers;
-import org.powermock.reflect.Whitebox;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.slf4j.Logger;
 
 import java.net.InetAddress;
 import java.net.URL;
 import java.util.Arrays;
 import java.util.Collections;
-import java.util.Map;
 import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executor;
+import java.util.concurrent.TimeoutException;
 
 import static org.hamcrest.Matchers.contains;
 import static org.junit.Assert.assertFalse;
@@ -124,15 +127,27 @@ public class TaskExecutorTest extends TestLogger {
 
 		final TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler();
 
-		final CountDownLatch waitLatch =  new CountDownLatch(1);
 		final long heartbeatTimeout = 10L;
-		final HeartbeatManagerImpl<Void, Void> tmHeartbeatManager = new TestingHeartbeatManagerImpl<>(
-				waitLatch,
-				heartbeatTimeout,
-				tmResourceId,
-				rpc.getExecutor(),
-				rpc.getScheduledExecutor(),
-				log);
+
+		HeartbeatServices heartbeatServices = mock(HeartbeatServices.class);
+		when(heartbeatServices.createHeartbeatManager(
+			eq(taskManagerLocation.getResourceID()),
+			any(HeartbeatListener.class),
+			any(ScheduledExecutor.class),
+			any(Logger.class))).thenAnswer(
+			new Answer<HeartbeatManagerImpl<Void, Void>>() {
+				@Override
+				public HeartbeatManagerImpl<Void, Void> answer(InvocationOnMock invocation) throws Throwable {
+					return new HeartbeatManagerImpl<>(
+						heartbeatTimeout,
+						taskManagerLocation.getResourceID(),
+						(HeartbeatListener<Void, Void>)invocation.getArguments()[1],
+						(Executor)invocation.getArguments()[2],
+						(ScheduledExecutor)invocation.getArguments()[2],
+						(Logger)invocation.getArguments()[3]);
+				}
+			}
+		);
 
 		final String jobMasterAddress = "jm";
 		final UUID jmLeaderId = UUID.randomUUID();
@@ -147,25 +162,26 @@ public class TaskExecutorTest extends TestLogger {
 				any(Time.class)
 		)).thenReturn(FlinkCompletableFuture.<RegistrationResponse>completed(new JMTMRegistrationSuccess(jmResourceId, blobPort)));
 		when(jobMasterGateway.getAddress()).thenReturn(jobMasterAddress);
+		when(jobMasterGateway.getHostname()).thenReturn("localhost");
 
 		try {
 			final TaskExecutor taskManager = new TaskExecutor(
-					tmConfig,
-					taskManagerLocation,
-					rpc,
-					mock(MemoryManager.class),
-					mock(IOManager.class),
-					mock(NetworkEnvironment.class),
-					haServices,
-					mock(MetricRegistry.class),
-					tmHeartbeatManager,
-					mock(TaskManagerMetricGroup.class),
-					mock(BroadcastVariableManager.class),
-					mock(FileCache.class),
-					taskSlotTable,
-					new JobManagerTable(),
-					jobLeaderService,
-					testingFatalErrorHandler);
+				tmConfig,
+				taskManagerLocation,
+				rpc,
+				mock(MemoryManager.class),
+				mock(IOManager.class),
+				mock(NetworkEnvironment.class),
+				haServices,
+				heartbeatServices,
+				mock(MetricRegistry.class),
+				mock(TaskManagerMetricGroup.class),
+				mock(BroadcastVariableManager.class),
+				mock(FileCache.class),
+				taskSlotTable,
+				new JobManagerTable(),
+				jobLeaderService,
+				testingFatalErrorHandler);
 
 			taskManager.start();
 
@@ -182,23 +198,8 @@ public class TaskExecutorTest extends TestLogger {
 			verify(jobMasterGateway).registerTaskManager(
 					eq(taskManager.getAddress()), eq(taskManagerLocation), eq(jmLeaderId), any(Time.class));
 
-			final ConcurrentHashMap<ResourceID, Object> heartbeatTargets = Whitebox.getInternalState(tmHeartbeatManager, "heartbeatTargets");
-			final JobManagerTable jobManagerTable = Whitebox.getInternalState(taskManager, "jobManagerTable");
-			final Map<ResourceID, JobManagerConnection> jobManagerConnections = Whitebox.getInternalState(taskManager, "jobManagerConnections");
-
-			// before heartbeat timeout
-			assertTrue(heartbeatTargets.containsKey(jmResourceId));
-			assertTrue(jobManagerTable.contains(jobId));
-			assertTrue(jobManagerConnections.containsKey(jmResourceId));
-
-			// continue to unmonitor heartbeat target
-			waitLatch.countDown();
-
-			// after heartbeat timeout
-			verify(jobMasterGateway, timeout(heartbeatTimeout)).disconnectTaskManager(eq(tmResourceId));
-			assertFalse(heartbeatTargets.containsKey(jmResourceId));
-			assertFalse(jobManagerTable.contains(jobId));
-			assertFalse(jobManagerConnections.containsKey(jmResourceId));
+			// the timeout should trigger disconnecting from the JobManager
+			verify(jobMasterGateway, timeout(heartbeatTimeout * 5)).disconnectTaskManager(eq(taskManagerLocation.getResourceID()), any(TimeoutException.class));
 
 			// check if a concurrent error occurred
 			testingFatalErrorHandler.rethrowError();
@@ -247,8 +248,8 @@ public class TaskExecutorTest extends TestLogger {
 				mock(IOManager.class),
 				mock(NetworkEnvironment.class),
 				haServices,
+				mock(HeartbeatServices.class, RETURNS_MOCKS),
 				mock(MetricRegistry.class),
-				mock(HeartbeatManagerImpl.class),
 				mock(TaskManagerMetricGroup.class),
 				mock(BroadcastVariableManager.class),
 				mock(FileCache.class),
@@ -324,8 +325,8 @@ public class TaskExecutorTest extends TestLogger {
 				mock(IOManager.class),
 				mock(NetworkEnvironment.class),
 				haServices,
+				mock(HeartbeatServices.class, RETURNS_MOCKS),
 				mock(MetricRegistry.class),
-				mock(HeartbeatManagerImpl.class),
 				mock(TaskManagerMetricGroup.class),
 				mock(BroadcastVariableManager.class),
 				mock(FileCache.class),
@@ -456,8 +457,8 @@ public class TaskExecutorTest extends TestLogger {
 				mock(IOManager.class),
 				networkEnvironment,
 				haServices,
+				mock(HeartbeatServices.class, RETURNS_MOCKS),
 				mock(MetricRegistry.class),
-				mock(HeartbeatManagerImpl.class),
 				taskManagerMetricGroup,
 				mock(BroadcastVariableManager.class),
 				mock(FileCache.class),
@@ -563,8 +564,8 @@ public class TaskExecutorTest extends TestLogger {
 				mock(IOManager.class),
 				mock(NetworkEnvironment.class),
 				haServices,
+				mock(HeartbeatServices.class, RETURNS_MOCKS),
 				mock(MetricRegistry.class),
-				mock(HeartbeatManagerImpl.class),
 				mock(TaskManagerMetricGroup.class),
 				mock(BroadcastVariableManager.class),
 				mock(FileCache.class),
@@ -602,7 +603,7 @@ public class TaskExecutorTest extends TestLogger {
 	}
 
 	/**
-	 * Tests that accepted slots go into state assigned and the others are returned to the  resource
+	 * Tests that accepted slots go into state assigned and the others are returned to the resource
 	 * manager.
 	 */
 	@Test
@@ -649,7 +650,6 @@ public class TaskExecutorTest extends TestLogger {
 		final AllocationID allocationId2 = new AllocationID();
 
 		final SlotOffer offer1 = new SlotOffer(allocationId1, 0, ResourceProfile.UNKNOWN);
-		final SlotOffer offer2 = new SlotOffer(allocationId2, 0, ResourceProfile.UNKNOWN);
 
 		final JobMasterGateway jobMasterGateway = mock(JobMasterGateway.class);
 
@@ -677,8 +677,8 @@ public class TaskExecutorTest extends TestLogger {
 				mock(IOManager.class),
 				mock(NetworkEnvironment.class),
 				haServices,
+				mock(HeartbeatServices.class, RETURNS_MOCKS),
 				mock(MetricRegistry.class),
-				mock(HeartbeatManagerImpl.class),
 				mock(TaskManagerMetricGroup.class),
 				mock(BroadcastVariableManager.class),
 				mock(FileCache.class),
@@ -752,8 +752,8 @@ public class TaskExecutorTest extends TestLogger {
 				mock(IOManager.class),
 				mock(NetworkEnvironment.class),
 				haServices,
+				mock(HeartbeatServices.class, RETURNS_MOCKS),
 				mock(MetricRegistry.class),
-				mock(HeartbeatManagerImpl.class),
 				mock(TaskManagerMetricGroup.class),
 				mock(BroadcastVariableManager.class),
 				mock(FileCache.class),
@@ -895,8 +895,8 @@ public class TaskExecutorTest extends TestLogger {
 				mock(IOManager.class),
 				mock(NetworkEnvironment.class),
 				haServices,
+				mock(HeartbeatServices.class, RETURNS_MOCKS),
 				mock(MetricRegistry.class),
-				mock(HeartbeatManagerImpl.class),
 				mock(TaskManagerMetricGroup.class),
 				mock(BroadcastVariableManager.class),
 				mock(FileCache.class),

http://git-wip-us.apache.org/repos/asf/flink/blob/97ccc147/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java
index ddeb02e..6fb7c86 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java
@@ -26,6 +26,8 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.clusterframework.ApplicationStatus;
 import org.apache.flink.runtime.clusterframework.BootstrapTools;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
 import org.apache.flink.runtime.jobgraph.JobGraph;
@@ -90,6 +92,9 @@ public class YarnFlinkApplicationMasterRunner extends AbstractYarnFlinkApplicati
 	private HighAvailabilityServices haServices;
 
 	@GuardedBy("lock")
+	private HeartbeatServices heartbeatServices;
+
+	@GuardedBy("lock")
 	private RpcService commonRpcService;
 
 	@GuardedBy("lock")
@@ -135,6 +140,8 @@ public class YarnFlinkApplicationMasterRunner extends AbstractYarnFlinkApplicati
 			synchronized (lock) {
 				LOG.info("Starting High Availability Services");
 				haServices = HighAvailabilityServicesUtils.createAvailableOrEmbeddedServices(config);
+
+				heartbeatServices = HeartbeatServices.fromConfiguration(config);
 				
 				metricRegistry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(config));
 				commonRpcService = createRpcService(config, appMasterHostname, amPortRange);
@@ -210,11 +217,14 @@ public class YarnFlinkApplicationMasterRunner extends AbstractYarnFlinkApplicati
 
 		// now the JobManagerRunner
 		return new JobManagerRunner(
-				jobGraph, config,
-				commonRpcService,
-				haServices,
-				this,
-				this);
+			ResourceID.generate(),
+			jobGraph,
+			config,
+			commonRpcService,
+			haServices,
+			heartbeatServices,
+			this,
+			this);
 	}
 
 	protected void shutdown(ApplicationStatus status, String msg) {

http://git-wip-us.apache.org/repos/asf/flink/blob/97ccc147/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunner.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunner.java
index 414c3de..2369765 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunner.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunner.java
@@ -24,6 +24,7 @@ import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.configuration.SecurityOptions;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
 import org.apache.flink.runtime.metrics.MetricRegistry;
@@ -208,11 +209,19 @@ public class YarnTaskExecutorRunner {
 			LOG.info("YARN assigned resource id {} for the task executor.", resourceID.toString());
 
 			haServices = HighAvailabilityServicesUtils.createAvailableOrEmbeddedServices(config);
+			HeartbeatServices heartbeatServices = HeartbeatServices.fromConfiguration(config);
+
 			metricRegistry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(config));
 
 			// ---- (2) init task manager runner -------
 			taskExecutorRpcService = TaskManagerRunner.createRpcService(config, haServices);
-			taskManagerRunner = new TaskManagerRunner(config, resourceID, taskExecutorRpcService, haServices, metricRegistry);
+			taskManagerRunner = new TaskManagerRunner(
+				config,
+				resourceID,
+				taskExecutorRpcService,
+				haServices,
+				heartbeatServices,
+				metricRegistry);
 
 			// ---- (3) start the task manager runner
 			taskManagerRunner.start();