You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/10/14 13:46:18 UTC

[36/50] [abbrv] flink git commit: [FLINK-4478] [flip-6] Add HeartbeatManager

[FLINK-4478] [flip-6] Add HeartbeatManager

Add a heartbeat manager abstraction which can monitor heartbeat targets. Whenever
no heartbeat signal has been received for a heartbeat timeout interval, the
heartbeat manager will issue a heartbeat timeout notification.

Add resourceID to HeartbeatListener.reportPayload

Replace scala future by Flink's futures

Add unmonitoring test

This closes #2435.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3e4eb4f9
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3e4eb4f9
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3e4eb4f9

Branch: refs/heads/flip-6
Commit: 3e4eb4f92012265b6fff27f0544fcd6d1629431f
Parents: 214113e
Author: Till Rohrmann <tr...@apache.org>
Authored: Thu Aug 25 14:05:07 2016 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Oct 14 15:14:42 2016 +0200

----------------------------------------------------------------------
 .../runtime/heartbeat/HeartbeatListener.java    |  62 ++++
 .../runtime/heartbeat/HeartbeatManager.java     |  67 ++++
 .../runtime/heartbeat/HeartbeatManagerImpl.java | 328 +++++++++++++++++++
 .../heartbeat/HeartbeatManagerSenderImpl.java   |  81 +++++
 .../runtime/heartbeat/HeartbeatTarget.java      |  50 +++
 .../runtime/heartbeat/HeartbeatManagerTest.java | 315 ++++++++++++++++++
 .../slotmanager/SlotProtocolTest.java           |   4 -
 7 files changed, 903 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/3e4eb4f9/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatListener.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatListener.java b/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatListener.java
new file mode 100644
index 0000000..8c08251
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatListener.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.heartbeat;
+
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.concurrent.Future;
+
+/**
+ * Interface for the interaction with the {@link HeartbeatManager}. The heartbeat listener is used
+ * for the following things:
+ * <p>
+ * <ul>
+ *     <il>Notifications about heartbeat timeouts</il>
+ *     <li>Payload reports of incoming heartbeats</li>
+ *     <li>Retrieval of payloads for outgoing heartbeats</li>
+ * </ul>
+ * @param <I> Type of the incoming payload
+ * @param <O> Type of the outgoing payload
+ */
+public interface HeartbeatListener<I, O> {
+
+	/**
+	 * Callback which is called if a heartbeat for the machine identified by the given resource
+	 * ID times out.
+	 *
+	 * @param resourceID Resource ID of the machine whose heartbeat has timed out
+	 */
+	void notifyHeartbeatTimeout(ResourceID resourceID);
+
+	/**
+	 * Callback which is called whenever a heartbeat with an associated payload is received. The
+	 * carried payload is given to this method.
+	 *
+	 * @param resourceID Resource ID identifying the sender of the payload
+	 * @param payload Payload of the received heartbeat
+	 */
+	void reportPayload(ResourceID resourceID, I payload);
+
+	/**
+	 * Retrieves the payload value for the next heartbeat message. Since the operation can happen
+	 * asynchronously, the result is returned wrapped in a future.
+	 *
+	 * @return Future containing the next payload for heartbeats
+	 */
+	Future<O> retrievePayload();
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/3e4eb4f9/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManager.java
new file mode 100644
index 0000000..12918ed
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManager.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.heartbeat;
+
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+
+/**
+ * A heartbeat manager has to be able to do the following things:
+ *
+ * <ul>
+ *     <li>Monitor {@link HeartbeatTarget} and report heartbeat timeouts for this target</li>
+ *     <li>Stop monitoring a {@link HeartbeatTarget}</li>
+ * </ul>
+ *
+ *
+ * @param <I> Type of the incoming payload
+ * @param <O> Type of the outgoing payload
+ */
+public interface HeartbeatManager<I, O> {
+
+	/**
+	 * Start monitoring a {@link HeartbeatTarget}. Heartbeat timeouts for this target are reported
+	 * to the {@link HeartbeatListener} associated with this heartbeat manager.
+	 *
+	 * @param resourceID Resource ID identifying the heartbeat target
+	 * @param heartbeatTarget Interface to send heartbeat requests and responses to the heartbeat
+	 *                        target
+	 */
+	void monitorTarget(ResourceID resourceID, HeartbeatTarget<O> heartbeatTarget);
+
+	/**
+	 * Stops monitoring the heartbeat target with the associated resource ID.
+	 *
+	 * @param resourceID Resource ID of the heartbeat target which shall no longer be monitored
+	 */
+	void unmonitorTarget(ResourceID resourceID);
+
+	/**
+	 * Starts the heartbeat manager with the given {@link HeartbeatListener}. The heartbeat listener
+	 * is notified about heartbeat timeouts and heartbeat payloads are reported and retrieved to
+	 * and from it.
+	 *
+	 * @param heartbeatListener Heartbeat listener associated with the heartbeat manager
+	 */
+	void start(HeartbeatListener<I, O> heartbeatListener);
+
+	/**
+	 * Stops the heartbeat manager.
+	 */
+	void stop();
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/3e4eb4f9/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerImpl.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerImpl.java
new file mode 100644
index 0000000..042f95b
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerImpl.java
@@ -0,0 +1,328 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.heartbeat;
+
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.concurrent.AcceptFunction;
+import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+
+import javax.annotation.concurrent.ThreadSafe;
+import java.util.Collection;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Heartbeat manager implementation. The heartbeat manager maintains a map of heartbeat monitors
+ * and resource IDs. Each monitor will be updated when a new heartbeat of the associated machine has
+ * been received. If the monitor detects that a heartbeat has timed out, it will notify the
+ * {@link HeartbeatListener} about it. A heartbeat times out iff no heartbeat signal has been
+ * received within a given timeout interval.
+ *
+ * @param <I> Type of the incoming heartbeat payload
+ * @param <O> Type of the outgoing heartbeat payload
+ */
+@ThreadSafe
+public class HeartbeatManagerImpl<I, O> implements HeartbeatManager<I, O>, HeartbeatTarget<I> {
+
+	/** Heartbeat timeout interval in milli seconds */
+	private final long heartbeatTimeoutIntervalMs;
+
+	/** Resource ID which is used to mark one own's heartbeat signals */
+	private final ResourceID ownResourceID;
+
+	/** Executor service used to run heartbeat timeout notifications */
+	private final ScheduledExecutorService scheduledExecutorService;
+
+	protected final Logger log;
+
+	/** Map containing the heartbeat monitors associated with the respective resource ID */
+	private final ConcurrentHashMap<ResourceID, HeartbeatManagerImpl.HeartbeatMonitor<O>> heartbeatTargets;
+
+	/** Execution context used to run future callbacks */
+	private final Executor executor;
+
+	/** Heartbeat listener with which the heartbeat manager has been associated */
+	private HeartbeatListener<I, O> heartbeatListener;
+
+	/** Running state of the heartbeat manager */
+	protected volatile boolean stopped;
+
+	public HeartbeatManagerImpl(
+		long heartbeatTimeoutIntervalMs,
+		ResourceID ownResourceID,
+		Executor executor,
+		ScheduledExecutorService scheduledExecutorService,
+		Logger log) {
+		Preconditions.checkArgument(heartbeatTimeoutIntervalMs > 0L, "The heartbeat timeout has to be larger than 0.");
+
+		this.heartbeatTimeoutIntervalMs = heartbeatTimeoutIntervalMs;
+		this.ownResourceID = Preconditions.checkNotNull(ownResourceID);
+		this.scheduledExecutorService = Preconditions.checkNotNull(scheduledExecutorService);
+		this.log = Preconditions.checkNotNull(log);
+		this.executor = Preconditions.checkNotNull(executor);
+		this.heartbeatTargets = new ConcurrentHashMap<>(16);
+
+		stopped = true;
+	}
+
+	//----------------------------------------------------------------------------------------------
+	// Getters
+	//----------------------------------------------------------------------------------------------
+
+	ResourceID getOwnResourceID() {
+		return ownResourceID;
+	}
+
+	Executor getExecutor() {
+		return executor;
+	}
+
+	HeartbeatListener<I, O> getHeartbeatListener() {
+		return heartbeatListener;
+	}
+
+	Collection<HeartbeatManagerImpl.HeartbeatMonitor<O>> getHeartbeatTargets() {
+		return heartbeatTargets.values();
+	}
+
+	//----------------------------------------------------------------------------------------------
+	// HeartbeatManager methods
+	//----------------------------------------------------------------------------------------------
+
+	@Override
+	public void monitorTarget(ResourceID resourceID, HeartbeatTarget<O> heartbeatTarget) {
+		if (!stopped) {
+			if (heartbeatTargets.containsKey(resourceID)) {
+				log.info("The target with resource ID {} is already been monitored.", resourceID);
+			} else {
+				HeartbeatManagerImpl.HeartbeatMonitor<O> heartbeatMonitor = new HeartbeatManagerImpl.HeartbeatMonitor<>(
+					resourceID,
+					heartbeatTarget,
+					scheduledExecutorService,
+					heartbeatListener,
+					heartbeatTimeoutIntervalMs);
+
+				heartbeatTargets.put(
+					resourceID,
+					heartbeatMonitor);
+
+				// check if we have stopped in the meantime (concurrent stop operation)
+				if (stopped) {
+					heartbeatMonitor.cancel();
+
+					heartbeatTargets.remove(resourceID);
+				}
+			}
+		}
+	}
+
+	@Override
+	public void unmonitorTarget(ResourceID resourceID) {
+		if (!stopped) {
+			HeartbeatManagerImpl.HeartbeatMonitor<O> heartbeatMonitor = heartbeatTargets.remove(resourceID);
+
+			if (heartbeatMonitor != null) {
+				heartbeatMonitor.cancel();
+			}
+		}
+	}
+
+	@Override
+	public void start(HeartbeatListener<I, O> heartbeatListener) {
+		Preconditions.checkState(stopped, "Cannot start an already started heartbeat manager.");
+
+		stopped = false;
+
+		this.heartbeatListener = Preconditions.checkNotNull(heartbeatListener);
+	}
+
+	@Override
+	public void stop() {
+		stopped = true;
+
+		for (HeartbeatManagerImpl.HeartbeatMonitor<O> heartbeatMonitor : heartbeatTargets.values()) {
+			heartbeatMonitor.cancel();
+		}
+
+		heartbeatTargets.clear();
+	}
+
+	//----------------------------------------------------------------------------------------------
+	// HeartbeatTarget methods
+	//----------------------------------------------------------------------------------------------
+
+	@Override
+	public void sendHeartbeat(ResourceID resourceID, I payload) {
+		if (!stopped) {
+			log.debug("Received heartbeat from {}.", resourceID);
+			reportHeartbeat(resourceID);
+
+			if (payload != null) {
+				heartbeatListener.reportPayload(resourceID, payload);
+			}
+		}
+	}
+
+	@Override
+	public void requestHeartbeat(ResourceID resourceID, I payload) {
+		if (!stopped) {
+			log.debug("Received heartbeat request from {}.", resourceID);
+
+			final HeartbeatTarget<O> heartbeatTarget = reportHeartbeat(resourceID);
+
+			if (heartbeatTarget != null) {
+				if (payload != null) {
+					heartbeatListener.reportPayload(resourceID, payload);
+				}
+
+				Future<O> futurePayload = heartbeatListener.retrievePayload();
+
+				if (futurePayload != null) {
+					futurePayload.thenAcceptAsync(new AcceptFunction<O>() {
+						@Override
+						public void accept(O retrievedPayload) {
+							heartbeatTarget.sendHeartbeat(getOwnResourceID(), retrievedPayload);
+						}
+					}, executor);
+				} else {
+					heartbeatTarget.sendHeartbeat(ownResourceID, null);
+				}
+			}
+		}
+	}
+
+	HeartbeatTarget<O> reportHeartbeat(ResourceID resourceID) {
+		if (heartbeatTargets.containsKey(resourceID)) {
+			HeartbeatManagerImpl.HeartbeatMonitor<O> heartbeatMonitor = heartbeatTargets.get(resourceID);
+			heartbeatMonitor.reportHeartbeat();
+
+			return heartbeatMonitor.getHeartbeatTarget();
+		} else {
+			return null;
+		}
+	}
+
+	//----------------------------------------------------------------------------------------------
+	// Utility classes
+	//----------------------------------------------------------------------------------------------
+
+	/**
+	 * Heartbeat monitor which manages the heartbeat state of the associated heartbeat target. The
+	 * monitor notifies the {@link HeartbeatListener} whenever it has not seen a heartbeat signal
+	 * in the specified heartbeat timeout interval. Each heartbeat signal resets this timer.
+	 *
+	 * @param <O> Type of the payload being sent to the associated heartbeat target
+	 */
+	static class HeartbeatMonitor<O> implements Runnable {
+
+		/** Resource ID of the monitored heartbeat target */
+		private final ResourceID resourceID;
+
+		/** Associated heartbeat target */
+		private final HeartbeatTarget<O> heartbeatTarget;
+
+		private final ScheduledExecutorService scheduledExecutorService;
+
+		/** Listener which is notified about heartbeat timeouts */
+		private final HeartbeatListener<?, ?> heartbeatListener;
+
+		/** Maximum heartbeat timeout interval */
+		private final long heartbeatTimeoutIntervalMs;
+
+		private volatile ScheduledFuture<?> futureTimeout;
+
+		private final AtomicReference<State> state = new AtomicReference<>(State.RUNNING);
+
+		HeartbeatMonitor(
+			ResourceID resourceID,
+			HeartbeatTarget<O> heartbeatTarget,
+			ScheduledExecutorService scheduledExecutorService,
+			HeartbeatListener<?, O> heartbeatListener,
+			long heartbeatTimeoutIntervalMs) {
+
+			this.resourceID = Preconditions.checkNotNull(resourceID);
+			this.heartbeatTarget = Preconditions.checkNotNull(heartbeatTarget);
+			this.scheduledExecutorService = Preconditions.checkNotNull(scheduledExecutorService);
+			this.heartbeatListener = Preconditions.checkNotNull(heartbeatListener);
+
+			Preconditions.checkArgument(heartbeatTimeoutIntervalMs >= 0L, "The heartbeat timeout interval has to be larger than 0.");
+			this.heartbeatTimeoutIntervalMs = heartbeatTimeoutIntervalMs;
+
+			resetHeartbeatTimeout(heartbeatTimeoutIntervalMs);
+		}
+
+		HeartbeatTarget<O> getHeartbeatTarget() {
+			return heartbeatTarget;
+		}
+
+		void reportHeartbeat() {
+			resetHeartbeatTimeout(heartbeatTimeoutIntervalMs);
+		}
+
+		void resetHeartbeatTimeout(long heartbeatTimeout) {
+			if (state.get() == State.RUNNING) {
+				cancelTimeout();
+
+				futureTimeout = scheduledExecutorService.schedule(this, heartbeatTimeout, TimeUnit.MILLISECONDS);
+
+				// Double check for concurrent accesses (e.g. a firing of the scheduled future)
+				if (state.get() != State.RUNNING) {
+					cancelTimeout();
+				}
+			}
+		}
+
+		void cancel() {
+			// we can only cancel if we are in state running
+			if (state.compareAndSet(State.RUNNING, State.CANCELED)) {
+				cancelTimeout();
+			}
+		}
+
+		private void cancelTimeout() {
+			if (futureTimeout != null) {
+				futureTimeout.cancel(true);
+			}
+		}
+
+		public boolean isCanceled() {
+			return state.get() == State.CANCELED;
+		}
+
+		@Override
+		public void run() {
+			// The heartbeat has timed out if we're in state running
+			if (state.compareAndSet(State.RUNNING, State.TIMEOUT)) {
+				heartbeatListener.notifyHeartbeatTimeout(resourceID);
+			}
+		}
+
+		private enum State {
+			RUNNING,
+			TIMEOUT,
+			CANCELED
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/3e4eb4f9/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerSenderImpl.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerSenderImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerSenderImpl.java
new file mode 100644
index 0000000..588ba7f
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerSenderImpl.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.heartbeat;
+
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.concurrent.AcceptFunction;
+import org.apache.flink.runtime.concurrent.Future;
+import org.slf4j.Logger;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * {@link HeartbeatManager} implementation which regularly requests a heartbeat response from
+ * its monitored {@link HeartbeatTarget}. The heartbeat period is configurable.
+ *
+ * @param <I> Type of the incoming heartbeat payload
+ * @param <O> Type of the outgoind heartbeat payload
+ */
+public class HeartbeatManagerSenderImpl<I, O> extends HeartbeatManagerImpl<I, O> implements Runnable {
+
+	private final ScheduledFuture<?> triggerFuture;
+
+	public HeartbeatManagerSenderImpl(
+		long heartbeatPeriod,
+		long heartbeatTimeout,
+		ResourceID ownResourceID,
+		ExecutorService executorService,
+		ScheduledExecutorService scheduledExecutorService,
+		Logger log) {
+		super(heartbeatTimeout, ownResourceID, executorService, scheduledExecutorService, log);
+
+		triggerFuture = scheduledExecutorService.scheduleAtFixedRate(this, 0L, heartbeatPeriod, TimeUnit.MILLISECONDS);
+	}
+
+	@Override
+	public void run() {
+		if (!stopped) {
+			log.debug("Trigger heartbeat request.");
+			for (HeartbeatMonitor<O> heartbeatMonitor : getHeartbeatTargets()) {
+				Future<O> futurePayload = getHeartbeatListener().retrievePayload();
+				final HeartbeatTarget<O> heartbeatTarget = heartbeatMonitor.getHeartbeatTarget();
+
+				if (futurePayload != null) {
+					futurePayload.thenAcceptAsync(new AcceptFunction<O>() {
+						@Override
+						public void accept(O payload) {
+							heartbeatTarget.requestHeartbeat(getOwnResourceID(), payload);
+						}
+					}, getExecutor());
+				} else {
+					heartbeatTarget.requestHeartbeat(getOwnResourceID(), null);
+				}
+			}
+		}
+	}
+
+	@Override
+	public void stop() {
+			triggerFuture.cancel(true);
+			super.stop();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/3e4eb4f9/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatTarget.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatTarget.java b/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatTarget.java
new file mode 100644
index 0000000..ef953de
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatTarget.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.heartbeat;
+
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+
+/**
+ * Interface for components which can be sent heartbeats and from which one can request a
+ * heartbeat response. Both the heartbeat response as well as the heartbeat request can carry a
+ * payload. This payload is reported to the heartbeat target and contains additional information.
+ * The payload can be empty which is indicated by a null value.
+ *
+ * @param <I> Type of the payload which is sent to the heartbeat target
+ */
+public interface HeartbeatTarget<I> {
+
+	/**
+	 * Sends a heartbeat response to the target. Each heartbeat response can carry a payload which
+	 * contains additional information for the heartbeat target.
+	 *
+	 * @param resourceID Resource ID identifying the machine for which a heartbeat shall be reported.
+	 * @param payload Payload of the heartbeat response. Null indicates an empty payload.
+	 */
+	void sendHeartbeat(ResourceID resourceID, I payload);
+
+	/**
+	 * Requests a heartbeat from the target. Each heartbeat request can carry a payload which
+	 * contains additional information for the heartbeat target.
+	 *
+	 * @param resourceID Resource ID identifying the machine issuing the heartbeat request.
+	 * @param payload Payload of the heartbeat response. Null indicates an empty payload.
+	 */
+	void requestHeartbeat(ResourceID resourceID, I payload);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/3e4eb4f9/flink-runtime/src/test/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerTest.java
new file mode 100644
index 0000000..1c62f17
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerTest.java
@@ -0,0 +1,315 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.heartbeat;
+
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.concurrent.CompletableFuture;
+import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
+import org.apache.flink.runtime.util.DirectExecutorService;
+import org.apache.flink.util.TestLogger;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.atLeast;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+
+public class HeartbeatManagerTest extends TestLogger {
+	private static final Logger LOG = LoggerFactory.getLogger(HeartbeatManagerTest.class);
+
+	/**
+	 * Tests that regular heartbeat signal triggers the right callback functions in the
+	 * {@link HeartbeatListener}.
+	 */
+	@Test
+	public void testRegularHeartbeat() {
+		long heartbeatTimeout = 1000L;
+		ResourceID ownResourceID = new ResourceID("foobar");
+		ResourceID targetResourceID = new ResourceID("barfoo");
+		HeartbeatListener<Object, Object> heartbeatListener = mock(HeartbeatListener.class);
+		ScheduledExecutorService scheduledExecutorService = mock(ScheduledExecutorService.class);
+
+		Object expectedObject = new Object();
+
+		when(heartbeatListener.retrievePayload()).thenReturn(FlinkCompletableFuture.completed(expectedObject));
+
+		HeartbeatManagerImpl<Object, Object> heartbeatManager = new HeartbeatManagerImpl<>(
+			heartbeatTimeout,
+			ownResourceID,
+			new DirectExecutorService(),
+			scheduledExecutorService,
+			LOG);
+
+		heartbeatManager.start(heartbeatListener);
+
+		HeartbeatTarget<Object> heartbeatTarget = mock(HeartbeatTarget.class);
+
+		heartbeatManager.monitorTarget(targetResourceID, heartbeatTarget);
+
+		heartbeatManager.requestHeartbeat(targetResourceID, expectedObject);
+
+		verify(heartbeatListener, times(1)).reportPayload(targetResourceID, expectedObject);
+		verify(heartbeatListener, times(1)).retrievePayload();
+		verify(heartbeatTarget, times(1)).sendHeartbeat(ownResourceID, expectedObject);
+
+		heartbeatManager.sendHeartbeat(targetResourceID, expectedObject);
+
+		verify(heartbeatListener, times(2)).reportPayload(targetResourceID, expectedObject);
+	}
+
+	/**
+	 * Tests that the heartbeat monitors are updated when receiving a new heartbeat signal.
+	 */
+	@Test
+	public void testHeartbeatMonitorUpdate() {
+		long heartbeatTimeout = 1000L;
+		ResourceID ownResourceID = new ResourceID("foobar");
+		ResourceID targetResourceID = new ResourceID("barfoo");
+		HeartbeatListener<Object, Object> heartbeatListener = mock(HeartbeatListener.class);
+		ScheduledExecutorService scheduledExecutorService = mock(ScheduledExecutorService.class);
+		ScheduledFuture<?> scheduledFuture = mock(ScheduledFuture.class);
+
+		doReturn(scheduledFuture).when(scheduledExecutorService).schedule(any(Runnable.class), anyLong(), any(TimeUnit.class));
+
+		Object expectedObject = new Object();
+
+		when(heartbeatListener.retrievePayload()).thenReturn(FlinkCompletableFuture.completed(expectedObject));
+
+		HeartbeatManagerImpl<Object, Object> heartbeatManager = new HeartbeatManagerImpl<>(
+			heartbeatTimeout,
+			ownResourceID,
+			new DirectExecutorService(),
+			scheduledExecutorService,
+			LOG);
+
+		heartbeatManager.start(heartbeatListener);
+
+		HeartbeatTarget<Object> heartbeatTarget = mock(HeartbeatTarget.class);
+
+		heartbeatManager.monitorTarget(targetResourceID, heartbeatTarget);
+
+		heartbeatManager.sendHeartbeat(targetResourceID, expectedObject);
+
+		verify(scheduledFuture, times(1)).cancel(true);
+		verify(scheduledExecutorService, times(2)).schedule(any(Runnable.class), eq(heartbeatTimeout), eq(TimeUnit.MILLISECONDS));
+	}
+
+	/**
+	 * Tests that a heartbeat timeout is signaled if the heartbeat is not reported in time.
+	 *
+	 * @throws Exception
+	 */
+	@Test
+	public void testHeartbeatTimeout() throws Exception {
+		long heartbeatTimeout = 100L;
+		int numHeartbeats = 10;
+		long heartbeatInterval = 20L;
+		Object payload = new Object();
+
+		ResourceID ownResourceID = new ResourceID("foobar");
+		ResourceID targetResourceID = new ResourceID("barfoo");
+		TestingHeartbeatListener heartbeatListener = new TestingHeartbeatListener(payload);
+		ScheduledExecutorService scheduledExecutorService = mock(ScheduledExecutorService.class);
+		ScheduledFuture<?> scheduledFuture = mock(ScheduledFuture.class);
+
+		doReturn(scheduledFuture).when(scheduledExecutorService).schedule(any(Runnable.class), anyLong(), any(TimeUnit.class));
+
+		Object expectedObject = new Object();
+
+		HeartbeatManagerImpl<Object, Object> heartbeatManager = new HeartbeatManagerImpl<>(
+			heartbeatTimeout,
+			ownResourceID,
+			new DirectExecutorService(),
+			new ScheduledThreadPoolExecutor(1),
+			LOG);
+
+		heartbeatManager.start(heartbeatListener);
+
+		HeartbeatTarget<Object> heartbeatTarget = mock(HeartbeatTarget.class);
+
+		Future<ResourceID> timeoutFuture = heartbeatListener.getTimeoutFuture();
+
+		heartbeatManager.monitorTarget(targetResourceID, heartbeatTarget);
+
+		for (int i = 0; i < numHeartbeats; i++) {
+			heartbeatManager.sendHeartbeat(targetResourceID, expectedObject);
+			Thread.sleep(heartbeatInterval);
+		}
+
+		assertFalse(timeoutFuture.isDone());
+
+		ResourceID timeoutResourceID = timeoutFuture.get(2 * heartbeatTimeout, TimeUnit.MILLISECONDS);
+
+		assertEquals(targetResourceID, timeoutResourceID);
+	}
+
+	/**
+	 * Tests the heartbeat interplay between the {@link HeartbeatManagerImpl} and the
+	 * {@link HeartbeatManagerSenderImpl}. The sender should regularly trigger heartbeat requests
+	 * which are fulfilled by the receiver. Upon stopping the receiver, the sender should notify
+	 * the heartbeat listener about the heartbeat timeout.
+	 *
+	 * @throws Exception
+	 */
+	@Test
+	public void testHeartbeatCluster() throws Exception {
+		long heartbeatTimeout = 100L;
+		long heartbeatPeriod = 20L;
+		Object object = new Object();
+		Object object2 = new Object();
+		ResourceID resourceID = new ResourceID("foobar");
+		ResourceID resourceID2 = new ResourceID("barfoo");
+		HeartbeatListener<Object, Object> heartbeatListener = mock(HeartbeatListener.class);
+
+		when(heartbeatListener.retrievePayload()).thenReturn(FlinkCompletableFuture.completed(object));
+
+		TestingHeartbeatListener heartbeatListener2 = new TestingHeartbeatListener(object2);
+
+		Future<ResourceID> futureTimeout = heartbeatListener2.getTimeoutFuture();
+
+		HeartbeatManagerImpl<Object, Object> heartbeatManager = new HeartbeatManagerImpl<>(
+			heartbeatTimeout,
+			resourceID,
+			new DirectExecutorService(),
+			new ScheduledThreadPoolExecutor(1),
+			LOG);
+
+		HeartbeatManagerSenderImpl<Object, Object> heartbeatManager2 = new HeartbeatManagerSenderImpl<>(
+			heartbeatPeriod,
+			heartbeatTimeout,
+			resourceID2,
+			new DirectExecutorService(),
+			new ScheduledThreadPoolExecutor(1),
+			LOG);;
+
+		heartbeatManager.start(heartbeatListener);
+		heartbeatManager2.start(heartbeatListener2);
+
+		heartbeatManager.monitorTarget(resourceID2, heartbeatManager2);
+		heartbeatManager2.monitorTarget(resourceID, heartbeatManager);
+
+		Thread.sleep(2 * heartbeatTimeout);
+
+		assertFalse(futureTimeout.isDone());
+
+		heartbeatManager.stop();
+
+		ResourceID timeoutResourceID = futureTimeout.get(2 * heartbeatTimeout, TimeUnit.MILLISECONDS);
+
+		assertEquals(resourceID, timeoutResourceID);
+
+		int numberHeartbeats = (int) (2 * heartbeatTimeout / heartbeatPeriod);
+
+		verify(heartbeatListener, atLeast(numberHeartbeats / 2)).reportPayload(resourceID2, object2);
+		assertTrue(heartbeatListener2.getNumberHeartbeatReports() >= numberHeartbeats / 2);
+	}
+
+	/**
+	 * Tests that after unmonitoring a target, there won't be a timeout triggered
+	 */
+	@Test
+	public void testTargetUnmonitoring() throws InterruptedException, ExecutionException {
+		// this might be too aggresive for Travis, let's see...
+		long heartbeatTimeout = 100L;
+		ResourceID resourceID = new ResourceID("foobar");
+		ResourceID targetID = new ResourceID("target");
+		Object object = new Object();
+
+		HeartbeatManager<Object, Object> heartbeatManager = new HeartbeatManagerImpl<>(
+			heartbeatTimeout,
+			resourceID,
+			new DirectExecutorService(),
+			new ScheduledThreadPoolExecutor(1),
+			LOG);
+
+		TestingHeartbeatListener heartbeatListener = new TestingHeartbeatListener(object);
+
+		heartbeatManager.start(heartbeatListener);
+
+		heartbeatManager.monitorTarget(targetID, mock(HeartbeatTarget.class));
+
+		heartbeatManager.unmonitorTarget(targetID);
+
+		Future<ResourceID> timeout = heartbeatListener.getTimeoutFuture();
+
+
+		try {
+			timeout.get(2 * heartbeatTimeout, TimeUnit.MILLISECONDS);
+			fail("Timeout should time out.");
+		} catch (TimeoutException e) {
+			// the timeout should not be completed since we unmonitored the target
+		}
+	}
+
+	static class TestingHeartbeatListener implements HeartbeatListener<Object, Object> {
+
+		private final CompletableFuture<ResourceID> future = new FlinkCompletableFuture<>();
+
+		private final Object payload;
+
+		private int numberHeartbeatReports;
+
+		TestingHeartbeatListener(Object payload) {
+			this.payload = payload;
+		}
+
+		public Future<ResourceID> getTimeoutFuture() {
+			return future;
+		}
+
+		public int getNumberHeartbeatReports() {
+			return numberHeartbeatReports;
+		}
+
+		@Override
+		public void notifyHeartbeatTimeout(ResourceID resourceID) {
+			future.complete(resourceID);
+		}
+
+		@Override
+		public void reportPayload(ResourceID resourceID, Object payload) {
+			numberHeartbeatReports++;
+		}
+
+		@Override
+		public Future<Object> retrievePayload() {
+			return FlinkCompletableFuture.completed(payload);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/3e4eb4f9/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
index 805ea71..a87fe42 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
@@ -25,7 +25,6 @@ import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.clusterframework.types.SlotID;
 import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
-import org.apache.flink.runtime.concurrent.impl.FlinkFuture;
 import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
 import org.apache.flink.runtime.jobmaster.JobMasterGateway;
 import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
@@ -43,12 +42,9 @@ import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.mockito.Mockito;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
 
 import java.util.Collections;
 import java.util.UUID;
-import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 
 import static org.mockito.Matchers.any;