You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/10/14 13:46:18 UTC
[36/50] [abbrv] flink git commit: [FLINK-4478] [flip-6] Add
HeartbeatManager
[FLINK-4478] [flip-6] Add HeartbeatManager
Add a heartbeat manager abstraction which can monitor heartbeat targets. Whenever
no heartbeat signal has been received for a heartbeat timeout interval, the
heartbeat manager will issue a heartbeat timeout notification.
Add resourceID to HeartbeatListener.reportPayload
Replace scala future by Flink's futures
Add unmonitoring test
This closes #2435.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3e4eb4f9
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3e4eb4f9
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3e4eb4f9
Branch: refs/heads/flip-6
Commit: 3e4eb4f92012265b6fff27f0544fcd6d1629431f
Parents: 214113e
Author: Till Rohrmann <tr...@apache.org>
Authored: Thu Aug 25 14:05:07 2016 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Oct 14 15:14:42 2016 +0200
----------------------------------------------------------------------
.../runtime/heartbeat/HeartbeatListener.java | 62 ++++
.../runtime/heartbeat/HeartbeatManager.java | 67 ++++
.../runtime/heartbeat/HeartbeatManagerImpl.java | 328 +++++++++++++++++++
.../heartbeat/HeartbeatManagerSenderImpl.java | 81 +++++
.../runtime/heartbeat/HeartbeatTarget.java | 50 +++
.../runtime/heartbeat/HeartbeatManagerTest.java | 315 ++++++++++++++++++
.../slotmanager/SlotProtocolTest.java | 4 -
7 files changed, 903 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/3e4eb4f9/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatListener.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatListener.java b/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatListener.java
new file mode 100644
index 0000000..8c08251
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatListener.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.heartbeat;
+
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.concurrent.Future;
+
+/**
+ * Interface for the interaction with the {@link HeartbeatManager}. The heartbeat listener is used
+ * for the following things:
+ * <p>
+ * <ul>
+ * <il>Notifications about heartbeat timeouts</il>
+ * <li>Payload reports of incoming heartbeats</li>
+ * <li>Retrieval of payloads for outgoing heartbeats</li>
+ * </ul>
+ * @param <I> Type of the incoming payload
+ * @param <O> Type of the outgoing payload
+ */
+public interface HeartbeatListener<I, O> {
+
+ /**
+ * Callback which is called if a heartbeat for the machine identified by the given resource
+ * ID times out.
+ *
+ * @param resourceID Resource ID of the machine whose heartbeat has timed out
+ */
+ void notifyHeartbeatTimeout(ResourceID resourceID);
+
+ /**
+ * Callback which is called whenever a heartbeat with an associated payload is received. The
+ * carried payload is given to this method.
+ *
+ * @param resourceID Resource ID identifying the sender of the payload
+ * @param payload Payload of the received heartbeat
+ */
+ void reportPayload(ResourceID resourceID, I payload);
+
+ /**
+ * Retrieves the payload value for the next heartbeat message. Since the operation can happen
+ * asynchronously, the result is returned wrapped in a future.
+ *
+ * @return Future containing the next payload for heartbeats
+ */
+ Future<O> retrievePayload();
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/3e4eb4f9/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManager.java
new file mode 100644
index 0000000..12918ed
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManager.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.heartbeat;
+
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+
+/**
+ * A heartbeat manager has to be able to do the following things:
+ *
+ * <ul>
+ * <li>Monitor {@link HeartbeatTarget} and report heartbeat timeouts for this target</li>
+ * <li>Stop monitoring a {@link HeartbeatTarget}</li>
+ * </ul>
+ *
+ *
+ * @param <I> Type of the incoming payload
+ * @param <O> Type of the outgoing payload
+ */
+public interface HeartbeatManager<I, O> {
+
+ /**
+ * Start monitoring a {@link HeartbeatTarget}. Heartbeat timeouts for this target are reported
+ * to the {@link HeartbeatListener} associated with this heartbeat manager.
+ *
+ * @param resourceID Resource ID identifying the heartbeat target
+ * @param heartbeatTarget Interface to send heartbeat requests and responses to the heartbeat
+ * target
+ */
+ void monitorTarget(ResourceID resourceID, HeartbeatTarget<O> heartbeatTarget);
+
+ /**
+ * Stops monitoring the heartbeat target with the associated resource ID.
+ *
+ * @param resourceID Resource ID of the heartbeat target which shall no longer be monitored
+ */
+ void unmonitorTarget(ResourceID resourceID);
+
+ /**
+ * Starts the heartbeat manager with the given {@link HeartbeatListener}. The heartbeat listener
+ * is notified about heartbeat timeouts and heartbeat payloads are reported and retrieved to
+ * and from it.
+ *
+ * @param heartbeatListener Heartbeat listener associated with the heartbeat manager
+ */
+ void start(HeartbeatListener<I, O> heartbeatListener);
+
+ /**
+ * Stops the heartbeat manager.
+ */
+ void stop();
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/3e4eb4f9/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerImpl.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerImpl.java
new file mode 100644
index 0000000..042f95b
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerImpl.java
@@ -0,0 +1,328 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.heartbeat;
+
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.concurrent.AcceptFunction;
+import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+
+import javax.annotation.concurrent.ThreadSafe;
+import java.util.Collection;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Heartbeat manager implementation. The heartbeat manager maintains a map of heartbeat monitors
+ * and resource IDs. Each monitor will be updated when a new heartbeat of the associated machine has
+ * been received. If the monitor detects that a heartbeat has timed out, it will notify the
+ * {@link HeartbeatListener} about it. A heartbeat times out iff no heartbeat signal has been
+ * received within a given timeout interval.
+ *
+ * @param <I> Type of the incoming heartbeat payload
+ * @param <O> Type of the outgoing heartbeat payload
+ */
+@ThreadSafe
+public class HeartbeatManagerImpl<I, O> implements HeartbeatManager<I, O>, HeartbeatTarget<I> {
+
+ /** Heartbeat timeout interval in milli seconds */
+ private final long heartbeatTimeoutIntervalMs;
+
+ /** Resource ID which is used to mark one own's heartbeat signals */
+ private final ResourceID ownResourceID;
+
+ /** Executor service used to run heartbeat timeout notifications */
+ private final ScheduledExecutorService scheduledExecutorService;
+
+ protected final Logger log;
+
+ /** Map containing the heartbeat monitors associated with the respective resource ID */
+ private final ConcurrentHashMap<ResourceID, HeartbeatManagerImpl.HeartbeatMonitor<O>> heartbeatTargets;
+
+ /** Execution context used to run future callbacks */
+ private final Executor executor;
+
+ /** Heartbeat listener with which the heartbeat manager has been associated */
+ private HeartbeatListener<I, O> heartbeatListener;
+
+ /** Running state of the heartbeat manager */
+ protected volatile boolean stopped;
+
+ public HeartbeatManagerImpl(
+ long heartbeatTimeoutIntervalMs,
+ ResourceID ownResourceID,
+ Executor executor,
+ ScheduledExecutorService scheduledExecutorService,
+ Logger log) {
+ Preconditions.checkArgument(heartbeatTimeoutIntervalMs > 0L, "The heartbeat timeout has to be larger than 0.");
+
+ this.heartbeatTimeoutIntervalMs = heartbeatTimeoutIntervalMs;
+ this.ownResourceID = Preconditions.checkNotNull(ownResourceID);
+ this.scheduledExecutorService = Preconditions.checkNotNull(scheduledExecutorService);
+ this.log = Preconditions.checkNotNull(log);
+ this.executor = Preconditions.checkNotNull(executor);
+ this.heartbeatTargets = new ConcurrentHashMap<>(16);
+
+ stopped = true;
+ }
+
+ //----------------------------------------------------------------------------------------------
+ // Getters
+ //----------------------------------------------------------------------------------------------
+
+ ResourceID getOwnResourceID() {
+ return ownResourceID;
+ }
+
+ Executor getExecutor() {
+ return executor;
+ }
+
+ HeartbeatListener<I, O> getHeartbeatListener() {
+ return heartbeatListener;
+ }
+
+ Collection<HeartbeatManagerImpl.HeartbeatMonitor<O>> getHeartbeatTargets() {
+ return heartbeatTargets.values();
+ }
+
+ //----------------------------------------------------------------------------------------------
+ // HeartbeatManager methods
+ //----------------------------------------------------------------------------------------------
+
+ @Override
+ public void monitorTarget(ResourceID resourceID, HeartbeatTarget<O> heartbeatTarget) {
+ if (!stopped) {
+ if (heartbeatTargets.containsKey(resourceID)) {
+ log.info("The target with resource ID {} is already been monitored.", resourceID);
+ } else {
+ HeartbeatManagerImpl.HeartbeatMonitor<O> heartbeatMonitor = new HeartbeatManagerImpl.HeartbeatMonitor<>(
+ resourceID,
+ heartbeatTarget,
+ scheduledExecutorService,
+ heartbeatListener,
+ heartbeatTimeoutIntervalMs);
+
+ heartbeatTargets.put(
+ resourceID,
+ heartbeatMonitor);
+
+ // check if we have stopped in the meantime (concurrent stop operation)
+ if (stopped) {
+ heartbeatMonitor.cancel();
+
+ heartbeatTargets.remove(resourceID);
+ }
+ }
+ }
+ }
+
+ @Override
+ public void unmonitorTarget(ResourceID resourceID) {
+ if (!stopped) {
+ HeartbeatManagerImpl.HeartbeatMonitor<O> heartbeatMonitor = heartbeatTargets.remove(resourceID);
+
+ if (heartbeatMonitor != null) {
+ heartbeatMonitor.cancel();
+ }
+ }
+ }
+
+ @Override
+ public void start(HeartbeatListener<I, O> heartbeatListener) {
+ Preconditions.checkState(stopped, "Cannot start an already started heartbeat manager.");
+
+ stopped = false;
+
+ this.heartbeatListener = Preconditions.checkNotNull(heartbeatListener);
+ }
+
+ @Override
+ public void stop() {
+ stopped = true;
+
+ for (HeartbeatManagerImpl.HeartbeatMonitor<O> heartbeatMonitor : heartbeatTargets.values()) {
+ heartbeatMonitor.cancel();
+ }
+
+ heartbeatTargets.clear();
+ }
+
+ //----------------------------------------------------------------------------------------------
+ // HeartbeatTarget methods
+ //----------------------------------------------------------------------------------------------
+
+ @Override
+ public void sendHeartbeat(ResourceID resourceID, I payload) {
+ if (!stopped) {
+ log.debug("Received heartbeat from {}.", resourceID);
+ reportHeartbeat(resourceID);
+
+ if (payload != null) {
+ heartbeatListener.reportPayload(resourceID, payload);
+ }
+ }
+ }
+
+ @Override
+ public void requestHeartbeat(ResourceID resourceID, I payload) {
+ if (!stopped) {
+ log.debug("Received heartbeat request from {}.", resourceID);
+
+ final HeartbeatTarget<O> heartbeatTarget = reportHeartbeat(resourceID);
+
+ if (heartbeatTarget != null) {
+ if (payload != null) {
+ heartbeatListener.reportPayload(resourceID, payload);
+ }
+
+ Future<O> futurePayload = heartbeatListener.retrievePayload();
+
+ if (futurePayload != null) {
+ futurePayload.thenAcceptAsync(new AcceptFunction<O>() {
+ @Override
+ public void accept(O retrievedPayload) {
+ heartbeatTarget.sendHeartbeat(getOwnResourceID(), retrievedPayload);
+ }
+ }, executor);
+ } else {
+ heartbeatTarget.sendHeartbeat(ownResourceID, null);
+ }
+ }
+ }
+ }
+
+ HeartbeatTarget<O> reportHeartbeat(ResourceID resourceID) {
+ if (heartbeatTargets.containsKey(resourceID)) {
+ HeartbeatManagerImpl.HeartbeatMonitor<O> heartbeatMonitor = heartbeatTargets.get(resourceID);
+ heartbeatMonitor.reportHeartbeat();
+
+ return heartbeatMonitor.getHeartbeatTarget();
+ } else {
+ return null;
+ }
+ }
+
+ //----------------------------------------------------------------------------------------------
+ // Utility classes
+ //----------------------------------------------------------------------------------------------
+
+ /**
+ * Heartbeat monitor which manages the heartbeat state of the associated heartbeat target. The
+ * monitor notifies the {@link HeartbeatListener} whenever it has not seen a heartbeat signal
+ * in the specified heartbeat timeout interval. Each heartbeat signal resets this timer.
+ *
+ * @param <O> Type of the payload being sent to the associated heartbeat target
+ */
+ static class HeartbeatMonitor<O> implements Runnable {
+
+ /** Resource ID of the monitored heartbeat target */
+ private final ResourceID resourceID;
+
+ /** Associated heartbeat target */
+ private final HeartbeatTarget<O> heartbeatTarget;
+
+ private final ScheduledExecutorService scheduledExecutorService;
+
+ /** Listener which is notified about heartbeat timeouts */
+ private final HeartbeatListener<?, ?> heartbeatListener;
+
+ /** Maximum heartbeat timeout interval */
+ private final long heartbeatTimeoutIntervalMs;
+
+ private volatile ScheduledFuture<?> futureTimeout;
+
+ private final AtomicReference<State> state = new AtomicReference<>(State.RUNNING);
+
+ HeartbeatMonitor(
+ ResourceID resourceID,
+ HeartbeatTarget<O> heartbeatTarget,
+ ScheduledExecutorService scheduledExecutorService,
+ HeartbeatListener<?, O> heartbeatListener,
+ long heartbeatTimeoutIntervalMs) {
+
+ this.resourceID = Preconditions.checkNotNull(resourceID);
+ this.heartbeatTarget = Preconditions.checkNotNull(heartbeatTarget);
+ this.scheduledExecutorService = Preconditions.checkNotNull(scheduledExecutorService);
+ this.heartbeatListener = Preconditions.checkNotNull(heartbeatListener);
+
+ Preconditions.checkArgument(heartbeatTimeoutIntervalMs >= 0L, "The heartbeat timeout interval has to be larger than 0.");
+ this.heartbeatTimeoutIntervalMs = heartbeatTimeoutIntervalMs;
+
+ resetHeartbeatTimeout(heartbeatTimeoutIntervalMs);
+ }
+
+ HeartbeatTarget<O> getHeartbeatTarget() {
+ return heartbeatTarget;
+ }
+
+ void reportHeartbeat() {
+ resetHeartbeatTimeout(heartbeatTimeoutIntervalMs);
+ }
+
+ void resetHeartbeatTimeout(long heartbeatTimeout) {
+ if (state.get() == State.RUNNING) {
+ cancelTimeout();
+
+ futureTimeout = scheduledExecutorService.schedule(this, heartbeatTimeout, TimeUnit.MILLISECONDS);
+
+ // Double check for concurrent accesses (e.g. a firing of the scheduled future)
+ if (state.get() != State.RUNNING) {
+ cancelTimeout();
+ }
+ }
+ }
+
+ void cancel() {
+ // we can only cancel if we are in state running
+ if (state.compareAndSet(State.RUNNING, State.CANCELED)) {
+ cancelTimeout();
+ }
+ }
+
+ private void cancelTimeout() {
+ if (futureTimeout != null) {
+ futureTimeout.cancel(true);
+ }
+ }
+
+ public boolean isCanceled() {
+ return state.get() == State.CANCELED;
+ }
+
+ @Override
+ public void run() {
+ // The heartbeat has timed out if we're in state running
+ if (state.compareAndSet(State.RUNNING, State.TIMEOUT)) {
+ heartbeatListener.notifyHeartbeatTimeout(resourceID);
+ }
+ }
+
+ private enum State {
+ RUNNING,
+ TIMEOUT,
+ CANCELED
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/3e4eb4f9/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerSenderImpl.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerSenderImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerSenderImpl.java
new file mode 100644
index 0000000..588ba7f
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerSenderImpl.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.heartbeat;
+
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.concurrent.AcceptFunction;
+import org.apache.flink.runtime.concurrent.Future;
+import org.slf4j.Logger;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * {@link HeartbeatManager} implementation which regularly requests a heartbeat response from
+ * its monitored {@link HeartbeatTarget}. The heartbeat period is configurable.
+ *
+ * @param <I> Type of the incoming heartbeat payload
+ * @param <O> Type of the outgoind heartbeat payload
+ */
+public class HeartbeatManagerSenderImpl<I, O> extends HeartbeatManagerImpl<I, O> implements Runnable {
+
+ private final ScheduledFuture<?> triggerFuture;
+
+ public HeartbeatManagerSenderImpl(
+ long heartbeatPeriod,
+ long heartbeatTimeout,
+ ResourceID ownResourceID,
+ ExecutorService executorService,
+ ScheduledExecutorService scheduledExecutorService,
+ Logger log) {
+ super(heartbeatTimeout, ownResourceID, executorService, scheduledExecutorService, log);
+
+ triggerFuture = scheduledExecutorService.scheduleAtFixedRate(this, 0L, heartbeatPeriod, TimeUnit.MILLISECONDS);
+ }
+
+ @Override
+ public void run() {
+ if (!stopped) {
+ log.debug("Trigger heartbeat request.");
+ for (HeartbeatMonitor<O> heartbeatMonitor : getHeartbeatTargets()) {
+ Future<O> futurePayload = getHeartbeatListener().retrievePayload();
+ final HeartbeatTarget<O> heartbeatTarget = heartbeatMonitor.getHeartbeatTarget();
+
+ if (futurePayload != null) {
+ futurePayload.thenAcceptAsync(new AcceptFunction<O>() {
+ @Override
+ public void accept(O payload) {
+ heartbeatTarget.requestHeartbeat(getOwnResourceID(), payload);
+ }
+ }, getExecutor());
+ } else {
+ heartbeatTarget.requestHeartbeat(getOwnResourceID(), null);
+ }
+ }
+ }
+ }
+
+ @Override
+ public void stop() {
+ triggerFuture.cancel(true);
+ super.stop();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/3e4eb4f9/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatTarget.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatTarget.java b/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatTarget.java
new file mode 100644
index 0000000..ef953de
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatTarget.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.heartbeat;
+
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+
+/**
+ * Interface for components which can be sent heartbeats and from which one can request a
+ * heartbeat response. Both the heartbeat response as well as the heartbeat request can carry a
+ * payload. This payload is reported to the heartbeat target and contains additional information.
+ * The payload can be empty which is indicated by a null value.
+ *
+ * @param <I> Type of the payload which is sent to the heartbeat target
+ */
+public interface HeartbeatTarget<I> {
+
+ /**
+ * Sends a heartbeat response to the target. Each heartbeat response can carry a payload which
+ * contains additional information for the heartbeat target.
+ *
+ * @param resourceID Resource ID identifying the machine for which a heartbeat shall be reported.
+ * @param payload Payload of the heartbeat response. Null indicates an empty payload.
+ */
+ void sendHeartbeat(ResourceID resourceID, I payload);
+
+ /**
+ * Requests a heartbeat from the target. Each heartbeat request can carry a payload which
+ * contains additional information for the heartbeat target.
+ *
+ * @param resourceID Resource ID identifying the machine issuing the heartbeat request.
+ * @param payload Payload of the heartbeat response. Null indicates an empty payload.
+ */
+ void requestHeartbeat(ResourceID resourceID, I payload);
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/3e4eb4f9/flink-runtime/src/test/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerTest.java
new file mode 100644
index 0000000..1c62f17
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerTest.java
@@ -0,0 +1,315 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.heartbeat;
+
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.concurrent.CompletableFuture;
+import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
+import org.apache.flink.runtime.util.DirectExecutorService;
+import org.apache.flink.util.TestLogger;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.atLeast;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+
+public class HeartbeatManagerTest extends TestLogger {
+ private static final Logger LOG = LoggerFactory.getLogger(HeartbeatManagerTest.class);
+
+ /**
+ * Tests that regular heartbeat signal triggers the right callback functions in the
+ * {@link HeartbeatListener}.
+ */
+ @Test
+ public void testRegularHeartbeat() {
+ long heartbeatTimeout = 1000L;
+ ResourceID ownResourceID = new ResourceID("foobar");
+ ResourceID targetResourceID = new ResourceID("barfoo");
+ HeartbeatListener<Object, Object> heartbeatListener = mock(HeartbeatListener.class);
+ ScheduledExecutorService scheduledExecutorService = mock(ScheduledExecutorService.class);
+
+ Object expectedObject = new Object();
+
+ when(heartbeatListener.retrievePayload()).thenReturn(FlinkCompletableFuture.completed(expectedObject));
+
+ HeartbeatManagerImpl<Object, Object> heartbeatManager = new HeartbeatManagerImpl<>(
+ heartbeatTimeout,
+ ownResourceID,
+ new DirectExecutorService(),
+ scheduledExecutorService,
+ LOG);
+
+ heartbeatManager.start(heartbeatListener);
+
+ HeartbeatTarget<Object> heartbeatTarget = mock(HeartbeatTarget.class);
+
+ heartbeatManager.monitorTarget(targetResourceID, heartbeatTarget);
+
+ heartbeatManager.requestHeartbeat(targetResourceID, expectedObject);
+
+ verify(heartbeatListener, times(1)).reportPayload(targetResourceID, expectedObject);
+ verify(heartbeatListener, times(1)).retrievePayload();
+ verify(heartbeatTarget, times(1)).sendHeartbeat(ownResourceID, expectedObject);
+
+ heartbeatManager.sendHeartbeat(targetResourceID, expectedObject);
+
+ verify(heartbeatListener, times(2)).reportPayload(targetResourceID, expectedObject);
+ }
+
+ /**
+ * Tests that the heartbeat monitors are updated when receiving a new heartbeat signal.
+ */
+ @Test
+ public void testHeartbeatMonitorUpdate() {
+ long heartbeatTimeout = 1000L;
+ ResourceID ownResourceID = new ResourceID("foobar");
+ ResourceID targetResourceID = new ResourceID("barfoo");
+ HeartbeatListener<Object, Object> heartbeatListener = mock(HeartbeatListener.class);
+ ScheduledExecutorService scheduledExecutorService = mock(ScheduledExecutorService.class);
+ ScheduledFuture<?> scheduledFuture = mock(ScheduledFuture.class);
+
+ doReturn(scheduledFuture).when(scheduledExecutorService).schedule(any(Runnable.class), anyLong(), any(TimeUnit.class));
+
+ Object expectedObject = new Object();
+
+ when(heartbeatListener.retrievePayload()).thenReturn(FlinkCompletableFuture.completed(expectedObject));
+
+ HeartbeatManagerImpl<Object, Object> heartbeatManager = new HeartbeatManagerImpl<>(
+ heartbeatTimeout,
+ ownResourceID,
+ new DirectExecutorService(),
+ scheduledExecutorService,
+ LOG);
+
+ heartbeatManager.start(heartbeatListener);
+
+ HeartbeatTarget<Object> heartbeatTarget = mock(HeartbeatTarget.class);
+
+ heartbeatManager.monitorTarget(targetResourceID, heartbeatTarget);
+
+ heartbeatManager.sendHeartbeat(targetResourceID, expectedObject);
+
+ verify(scheduledFuture, times(1)).cancel(true);
+ verify(scheduledExecutorService, times(2)).schedule(any(Runnable.class), eq(heartbeatTimeout), eq(TimeUnit.MILLISECONDS));
+ }
+
+ /**
+ * Tests that a heartbeat timeout is signaled if the heartbeat is not reported in time.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testHeartbeatTimeout() throws Exception {
+ long heartbeatTimeout = 100L;
+ int numHeartbeats = 10;
+ long heartbeatInterval = 20L;
+ Object payload = new Object();
+
+ ResourceID ownResourceID = new ResourceID("foobar");
+ ResourceID targetResourceID = new ResourceID("barfoo");
+ TestingHeartbeatListener heartbeatListener = new TestingHeartbeatListener(payload);
+ ScheduledExecutorService scheduledExecutorService = mock(ScheduledExecutorService.class);
+ ScheduledFuture<?> scheduledFuture = mock(ScheduledFuture.class);
+
+ doReturn(scheduledFuture).when(scheduledExecutorService).schedule(any(Runnable.class), anyLong(), any(TimeUnit.class));
+
+ Object expectedObject = new Object();
+
+ HeartbeatManagerImpl<Object, Object> heartbeatManager = new HeartbeatManagerImpl<>(
+ heartbeatTimeout,
+ ownResourceID,
+ new DirectExecutorService(),
+ new ScheduledThreadPoolExecutor(1),
+ LOG);
+
+ heartbeatManager.start(heartbeatListener);
+
+ HeartbeatTarget<Object> heartbeatTarget = mock(HeartbeatTarget.class);
+
+ Future<ResourceID> timeoutFuture = heartbeatListener.getTimeoutFuture();
+
+ heartbeatManager.monitorTarget(targetResourceID, heartbeatTarget);
+
+ for (int i = 0; i < numHeartbeats; i++) {
+ heartbeatManager.sendHeartbeat(targetResourceID, expectedObject);
+ Thread.sleep(heartbeatInterval);
+ }
+
+ assertFalse(timeoutFuture.isDone());
+
+ ResourceID timeoutResourceID = timeoutFuture.get(2 * heartbeatTimeout, TimeUnit.MILLISECONDS);
+
+ assertEquals(targetResourceID, timeoutResourceID);
+ }
+
+ /**
+ * Tests the heartbeat interplay between the {@link HeartbeatManagerImpl} and the
+ * {@link HeartbeatManagerSenderImpl}. The sender should regularly trigger heartbeat requests
+ * which are fulfilled by the receiver. Upon stopping the receiver, the sender should notify
+ * the heartbeat listener about the heartbeat timeout.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testHeartbeatCluster() throws Exception {
+ long heartbeatTimeout = 100L;
+ long heartbeatPeriod = 20L;
+ Object object = new Object();
+ Object object2 = new Object();
+ ResourceID resourceID = new ResourceID("foobar");
+ ResourceID resourceID2 = new ResourceID("barfoo");
+ HeartbeatListener<Object, Object> heartbeatListener = mock(HeartbeatListener.class);
+
+ when(heartbeatListener.retrievePayload()).thenReturn(FlinkCompletableFuture.completed(object));
+
+ TestingHeartbeatListener heartbeatListener2 = new TestingHeartbeatListener(object2);
+
+ Future<ResourceID> futureTimeout = heartbeatListener2.getTimeoutFuture();
+
+ HeartbeatManagerImpl<Object, Object> heartbeatManager = new HeartbeatManagerImpl<>(
+ heartbeatTimeout,
+ resourceID,
+ new DirectExecutorService(),
+ new ScheduledThreadPoolExecutor(1),
+ LOG);
+
+ HeartbeatManagerSenderImpl<Object, Object> heartbeatManager2 = new HeartbeatManagerSenderImpl<>(
+ heartbeatPeriod,
+ heartbeatTimeout,
+ resourceID2,
+ new DirectExecutorService(),
+ new ScheduledThreadPoolExecutor(1),
+ LOG);;
+
+ heartbeatManager.start(heartbeatListener);
+ heartbeatManager2.start(heartbeatListener2);
+
+ heartbeatManager.monitorTarget(resourceID2, heartbeatManager2);
+ heartbeatManager2.monitorTarget(resourceID, heartbeatManager);
+
+ Thread.sleep(2 * heartbeatTimeout);
+
+ assertFalse(futureTimeout.isDone());
+
+ heartbeatManager.stop();
+
+ ResourceID timeoutResourceID = futureTimeout.get(2 * heartbeatTimeout, TimeUnit.MILLISECONDS);
+
+ assertEquals(resourceID, timeoutResourceID);
+
+ int numberHeartbeats = (int) (2 * heartbeatTimeout / heartbeatPeriod);
+
+ verify(heartbeatListener, atLeast(numberHeartbeats / 2)).reportPayload(resourceID2, object2);
+ assertTrue(heartbeatListener2.getNumberHeartbeatReports() >= numberHeartbeats / 2);
+ }
+
+ /**
+ * Tests that after unmonitoring a target, there won't be a timeout triggered
+ */
+ @Test
+ public void testTargetUnmonitoring() throws InterruptedException, ExecutionException {
+ // this might be too aggresive for Travis, let's see...
+ long heartbeatTimeout = 100L;
+ ResourceID resourceID = new ResourceID("foobar");
+ ResourceID targetID = new ResourceID("target");
+ Object object = new Object();
+
+ HeartbeatManager<Object, Object> heartbeatManager = new HeartbeatManagerImpl<>(
+ heartbeatTimeout,
+ resourceID,
+ new DirectExecutorService(),
+ new ScheduledThreadPoolExecutor(1),
+ LOG);
+
+ TestingHeartbeatListener heartbeatListener = new TestingHeartbeatListener(object);
+
+ heartbeatManager.start(heartbeatListener);
+
+ heartbeatManager.monitorTarget(targetID, mock(HeartbeatTarget.class));
+
+ heartbeatManager.unmonitorTarget(targetID);
+
+ Future<ResourceID> timeout = heartbeatListener.getTimeoutFuture();
+
+
+ try {
+ timeout.get(2 * heartbeatTimeout, TimeUnit.MILLISECONDS);
+ fail("Timeout should time out.");
+ } catch (TimeoutException e) {
+ // the timeout should not be completed since we unmonitored the target
+ }
+ }
+
+ static class TestingHeartbeatListener implements HeartbeatListener<Object, Object> {
+
+ private final CompletableFuture<ResourceID> future = new FlinkCompletableFuture<>();
+
+ private final Object payload;
+
+ private int numberHeartbeatReports;
+
+ TestingHeartbeatListener(Object payload) {
+ this.payload = payload;
+ }
+
+ public Future<ResourceID> getTimeoutFuture() {
+ return future;
+ }
+
+ public int getNumberHeartbeatReports() {
+ return numberHeartbeatReports;
+ }
+
+ @Override
+ public void notifyHeartbeatTimeout(ResourceID resourceID) {
+ future.complete(resourceID);
+ }
+
+ @Override
+ public void reportPayload(ResourceID resourceID, Object payload) {
+ numberHeartbeatReports++;
+ }
+
+ @Override
+ public Future<Object> retrievePayload() {
+ return FlinkCompletableFuture.completed(payload);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/3e4eb4f9/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
index 805ea71..a87fe42 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
@@ -25,7 +25,6 @@ import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.clusterframework.types.SlotID;
import org.apache.flink.runtime.concurrent.Future;
import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
-import org.apache.flink.runtime.concurrent.impl.FlinkFuture;
import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
import org.apache.flink.runtime.jobmaster.JobMasterGateway;
import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
@@ -43,12 +42,9 @@ import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.mockito.Mockito;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
import java.util.Collections;
import java.util.UUID;
-import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import static org.mockito.Matchers.any;