You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by mc...@apache.org on 2016/04/22 21:01:49 UTC

[3/4] nifi git commit: NIFI-1678: Started refactoring heartbeating mechanism, using a new package: org.apache.nifi.cluster.coordination

http://git-wip-us.apache.org/repos/asf/nifi/blob/fb7b3fe4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/AbstractHeartbeatMonitor.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/AbstractHeartbeatMonitor.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/AbstractHeartbeatMonitor.java
new file mode 100644
index 0000000..d6838cd
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/AbstractHeartbeatMonitor.java
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.cluster.coordination.heartbeat;
+
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.nifi.cluster.coordination.ClusterCoordinator;
+import org.apache.nifi.cluster.coordination.node.DisconnectionCode;
+import org.apache.nifi.cluster.coordination.node.NodeConnectionState;
+import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus;
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
+import org.apache.nifi.engine.FlowEngine;
+import org.apache.nifi.reporting.Severity;
+import org.apache.nifi.util.FormatUtils;
+import org.apache.nifi.util.NiFiProperties;
+import org.apache.nifi.util.StopWatch;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class AbstractHeartbeatMonitor implements HeartbeatMonitor {
+
+    private final int heartbeatIntervalMillis;
+    private static final Logger logger = LoggerFactory.getLogger(AbstractHeartbeatMonitor.class);
+    protected final ClusterCoordinator clusterCoordinator;
+    protected final FlowEngine flowEngine = new FlowEngine(1, "Heartbeat Monitor", true);
+
+    protected volatile long latestHeartbeatTime;
+    private volatile ScheduledFuture<?> future;
+    private volatile boolean stopped = true;
+
+
+    public AbstractHeartbeatMonitor(final ClusterCoordinator clusterCoordinator, final Properties properties) {
+        this.clusterCoordinator = clusterCoordinator;
+        final String heartbeatInterval = properties.getProperty(NiFiProperties.CLUSTER_PROTOCOL_HEARTBEAT_INTERVAL,
+            NiFiProperties.DEFAULT_CLUSTER_PROTOCOL_HEARTBEAT_INTERVAL);
+        this.heartbeatIntervalMillis = (int) FormatUtils.getTimeDuration(heartbeatInterval, TimeUnit.MILLISECONDS);
+    }
+
+    @Override
+    public final void start() {
+        stopped = false;
+        onStart();
+
+        this.future = flowEngine.scheduleWithFixedDelay(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    monitorHeartbeats();
+                } catch (final Exception e) {
+                    clusterCoordinator.reportEvent(null, Severity.ERROR, "Failed to process heartbeats from nodes due to " + e.toString());
+                    logger.error("Failed to process heartbeats", e);
+                }
+            }
+        }, heartbeatIntervalMillis, heartbeatIntervalMillis, TimeUnit.MILLISECONDS);
+    }
+
+    @Override
+    public final void stop() {
+        this.stopped = true;
+
+        try {
+            if (future != null) {
+                future.cancel(true);
+            }
+        } finally {
+            onStop();
+        }
+    }
+
+    protected boolean isStopped() {
+        return stopped;
+    }
+
+    @Override
+    public NodeHeartbeat getLatestHeartbeat(final NodeIdentifier nodeId) {
+        return getLatestHeartbeats().get(nodeId);
+    }
+
+    protected ClusterCoordinator getClusterCoordinator() {
+        return clusterCoordinator;
+    }
+
+    protected long getHeartbeatInterval(final TimeUnit timeUnit) {
+        return timeUnit.convert(heartbeatIntervalMillis, TimeUnit.MILLISECONDS);
+    }
+
+    /**
+     * Fetches all of the latest heartbeats and updates the Cluster Coordinator as appropriate,
+     * based on the heartbeats received.
+     *
+     * Visible for testing.
+     */
+    protected synchronized void monitorHeartbeats() {
+        final Map<NodeIdentifier, NodeHeartbeat> latestHeartbeats = getLatestHeartbeats();
+        if (latestHeartbeats == null || latestHeartbeats.isEmpty()) {
+            // failed to fetch heartbeats; don't change anything.
+            clusterCoordinator.reportEvent(null, Severity.WARNING, "Failed to retrieve any new heartbeat information for nodes. "
+                + "Will not make any decisions based on heartbeats.");
+            return;
+        }
+
+        final StopWatch procStopWatch = new StopWatch(true);
+        for (final NodeHeartbeat heartbeat : latestHeartbeats.values()) {
+            try {
+                processHeartbeat(heartbeat);
+            } catch (final Exception e) {
+                clusterCoordinator.reportEvent(null, Severity.ERROR,
+                    "Received heartbeat from " + heartbeat.getNodeIdentifier() + " but failed to process heartbeat due to " + e);
+                logger.error("Failed to process heartbeat from {} due to {}", heartbeat.getNodeIdentifier(), e.toString());
+                logger.error("", e);
+            }
+        }
+
+        procStopWatch.stop();
+        logger.info("Finished processing {} heartbeats in {}", latestHeartbeats.size(), procStopWatch.getDuration());
+
+        // Disconnect any node that hasn't sent a heartbeat in a long time (8 times the heartbeat interval)
+        final long maxMillis = heartbeatIntervalMillis * 1000L * 8;
+        final long threshold = latestHeartbeatTime - maxMillis;
+        for (final NodeHeartbeat heartbeat : latestHeartbeats.values()) {
+            if (heartbeat.getTimestamp() < threshold) {
+                clusterCoordinator.requestNodeDisconnect(heartbeat.getNodeIdentifier(), DisconnectionCode.LACK_OF_HEARTBEAT,
+                    "Latest heartbeat from Node has expired");
+
+                try {
+                    removeHeartbeat(heartbeat.getNodeIdentifier());
+                } catch (final Exception e) {
+                    logger.warn("Failed to remove heartbeat for {} due to {}", heartbeat.getNodeIdentifier(), e.toString());
+                    logger.warn("", e);
+                }
+            }
+        }
+    }
+
+    private void processHeartbeat(final NodeHeartbeat heartbeat) {
+        final NodeIdentifier nodeId = heartbeat.getNodeIdentifier();
+
+        // Do not process heartbeat if it's blocked by firewall.
+        if (clusterCoordinator.isBlockedByFirewall(nodeId.getSocketAddress())) {
+            clusterCoordinator.reportEvent(nodeId, Severity.WARNING, "Firewall blocked received heartbeat. Issuing disconnection request.");
+
+            // request node to disconnect
+            clusterCoordinator.requestNodeDisconnect(nodeId, DisconnectionCode.BLOCKED_BY_FIREWALL, "Blocked by Firewall");
+            removeHeartbeat(nodeId);
+            return;
+        }
+
+        final NodeConnectionStatus connectionStatus = clusterCoordinator.getConnectionStatus(nodeId);
+        if (connectionStatus == null) {
+            final NodeConnectionState hbConnectionState = heartbeat.getConnectionStatus().getState();
+            if (hbConnectionState == NodeConnectionState.DISCONNECTED || hbConnectionState == NodeConnectionState.DISCONNECTING) {
+                // Node is not part of the cluster. Remove heartbeat and move on.
+                removeHeartbeat(nodeId);
+                return;
+            }
+
+            // Unknown node. Issue reconnect request
+            clusterCoordinator.reportEvent(nodeId, Severity.INFO, "Received heartbeat from unknown node. Removing heartbeat and requesting that node connect to cluster.");
+            removeHeartbeat(nodeId);
+
+            clusterCoordinator.requestNodeConnect(nodeId);
+            return;
+        }
+
+        final DisconnectionCode reportedDisconnectCode = heartbeat.getConnectionStatus().getDisconnectCode();
+        if (reportedDisconnectCode != null) {
+            // Check if the node is notifying us that it wants to disconnect from the cluster
+            final boolean requestingDisconnect;
+            switch (reportedDisconnectCode) {
+                case MISMATCHED_FLOWS:
+                case NODE_SHUTDOWN:
+                case STARTUP_FAILURE:
+                    final NodeConnectionState expectedState = connectionStatus.getState();
+                    requestingDisconnect = expectedState == NodeConnectionState.CONNECTED || expectedState == NodeConnectionState.CONNECTING;
+                    break;
+                default:
+                    requestingDisconnect = false;
+                    break;
+            }
+
+            if (requestingDisconnect) {
+                clusterCoordinator.disconnectionRequestedByNode(nodeId, heartbeat.getConnectionStatus().getDisconnectCode(),
+                    heartbeat.getConnectionStatus().getDisconnectReason());
+                removeHeartbeat(nodeId);
+                return;
+            }
+        }
+
+        final NodeConnectionState connectionState = connectionStatus.getState();
+        if (heartbeat.getConnectionStatus().getState() != NodeConnectionState.CONNECTED && connectionState == NodeConnectionState.CONNECTED) {
+            // Cluster Coordinator believes that node is connected, but node does not believe so.
+            clusterCoordinator.reportEvent(nodeId, Severity.WARNING, "Received heartbeat from node that thinks it is not yet part of the cluster,"
+                + "though the Cluster Coordinator thought it was (node claimed state was " + heartbeat.getConnectionStatus().getState()
+                + "). Marking as Disconnected and requesting that Node reconnect to cluster");
+            clusterCoordinator.requestNodeConnect(nodeId);
+            return;
+        }
+
+        if (NodeConnectionState.DISCONNECTED == connectionState) {
+            // ignore heartbeats from nodes disconnected by means other than lack of heartbeat, unless it is
+            // the only node. We allow it if it is the only node because if we have a one-node cluster, then
+            // we cannot manually reconnect it.
+            final DisconnectionCode disconnectionCode = connectionStatus.getDisconnectCode();
+
+            if (disconnectionCode == DisconnectionCode.LACK_OF_HEARTBEAT) {
+                // record event
+                clusterCoordinator.reportEvent(nodeId, Severity.INFO, "Received heartbeat from node previously "
+                    + "disconnected due to lack of heartbeat. Issuing reconnection request.");
+
+                clusterCoordinator.requestNodeConnect(nodeId);
+            } else {
+                // disconnected nodes should not heartbeat, so we need to issue a disconnection request
+                logger.info("Ignoring received heartbeat from disconnected node " + nodeId + ".  Issuing disconnection request.");
+                clusterCoordinator.requestNodeDisconnect(nodeId, connectionStatus.getDisconnectCode(), connectionStatus.getDisconnectReason());
+                removeHeartbeat(nodeId);
+            }
+
+            return;
+        }
+
+        if (NodeConnectionState.DISCONNECTING == connectionStatus.getState()) {
+            // ignore spurious heartbeat
+            removeHeartbeat(nodeId);
+            return;
+        }
+
+        // first heartbeat causes status change from connecting to connected
+        if (NodeConnectionState.CONNECTING == connectionState) {
+            final Long connectionRequestTime = connectionStatus.getConnectionRequestTime();
+            if (connectionRequestTime != null && heartbeat.getTimestamp() < connectionRequestTime) {
+                clusterCoordinator.reportEvent(nodeId, Severity.INFO, "Received heartbeat but ignoring because it was reported before the node was last asked to reconnect.");
+                removeHeartbeat(nodeId);
+                return;
+            }
+
+            // connection complete
+            clusterCoordinator.finishNodeConnection(nodeId);
+            clusterCoordinator.reportEvent(nodeId, Severity.INFO, "Received first heartbeat from connecting node. Node connected.");
+        }
+
+        if (heartbeat.isPrimary()) {
+            clusterCoordinator.setPrimaryNode(nodeId);
+        }
+    }
+
+
+    /**
+     * @return the most recent heartbeat information for each node in the cluster
+     */
+    protected abstract Map<NodeIdentifier, NodeHeartbeat> getLatestHeartbeats();
+
+    /**
+     * This method does nothing in the abstract class but is meant for subclasses to
+     * override in order to provide functionality when the monitor is started.
+     */
+    protected void onStart() {
+    }
+
+    /**
+     * This method does nothing in the abstract class but is meant for subclasses to
+     * override in order to provide functionality when the monitor is stopped.
+     */
+    protected void onStop() {
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/fb7b3fe4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/ClusterProtocolHeartbeatMonitor.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/ClusterProtocolHeartbeatMonitor.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/ClusterProtocolHeartbeatMonitor.java
new file mode 100644
index 0000000..d9ef0be
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/ClusterProtocolHeartbeatMonitor.java
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.cluster.coordination.heartbeat;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.nio.charset.StandardCharsets;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import javax.xml.bind.JAXBContext;
+import javax.xml.bind.Unmarshaller;
+
+import org.apache.curator.RetryPolicy;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.RetryForever;
+import org.apache.nifi.cluster.HeartbeatPayload;
+import org.apache.nifi.cluster.coordination.ClusterCoordinator;
+import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus;
+import org.apache.nifi.cluster.protocol.Heartbeat;
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
+import org.apache.nifi.cluster.protocol.ProtocolException;
+import org.apache.nifi.cluster.protocol.ProtocolHandler;
+import org.apache.nifi.cluster.protocol.message.HeartbeatMessage;
+import org.apache.nifi.cluster.protocol.message.ProtocolMessage;
+import org.apache.nifi.cluster.protocol.message.ProtocolMessage.MessageType;
+import org.apache.nifi.controller.cluster.ZooKeeperClientConfig;
+import org.apache.nifi.reporting.Severity;
+import org.apache.nifi.util.NiFiProperties;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException.NoNodeException;
+import org.apache.zookeeper.KeeperException.NodeExistsException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Uses Apache Curator to monitor heartbeats from nodes
+ */
+public class ClusterProtocolHeartbeatMonitor extends AbstractHeartbeatMonitor implements HeartbeatMonitor, ProtocolHandler {
+    protected static final Logger logger = LoggerFactory.getLogger(ClusterProtocolHeartbeatMonitor.class);
+    private static final String COORDINATOR_ZNODE_NAME = "coordinator";
+
+    private final ZooKeeperClientConfig zkClientConfig;
+    private final String clusterNodesPath;
+
+    private volatile CuratorFramework curatorClient;
+    private volatile Map<String, NodeIdentifier> clusterNodeIds = new HashMap<>();
+
+    private final String heartbeatAddress;
+    private final ConcurrentMap<NodeIdentifier, NodeHeartbeat> heartbeatMessages = new ConcurrentHashMap<>();
+
+    protected static final Unmarshaller nodeIdentifierUnmarshaller;
+
+    static {
+        try {
+            final JAXBContext jaxbContext = JAXBContext.newInstance(NodeIdentifier.class);
+            nodeIdentifierUnmarshaller = jaxbContext.createUnmarshaller();
+        } catch (final Exception e) {
+            throw new RuntimeException("Failed to create an Unmarshaller for unmarshalling Node Identifier", e);
+        }
+    }
+
+
+    public ClusterProtocolHeartbeatMonitor(final ClusterCoordinator clusterCoordinator, final Properties properties) {
+        super(clusterCoordinator, properties);
+
+        this.zkClientConfig = ZooKeeperClientConfig.createConfig(properties);
+        this.clusterNodesPath = zkClientConfig.resolvePath("cluster/nodes");
+
+        String hostname = properties.getProperty(NiFiProperties.CLUSTER_MANAGER_ADDRESS);
+        if (hostname == null) {
+            try {
+                hostname = InetAddress.getLocalHost().getHostName();
+            } catch (UnknownHostException e) {
+                throw new RuntimeException("Unable to determine local hostname and the '" + NiFiProperties.CLUSTER_MANAGER_ADDRESS + "' property is not set");
+            }
+        }
+
+        final String port = properties.getProperty(NiFiProperties.CLUSTER_MANAGER_PROTOCOL_PORT);
+        if (port == null) {
+            throw new RuntimeException("Unable to determine which port Cluster Manager Protocol is listening on because the '"
+                + NiFiProperties.CLUSTER_MANAGER_PROTOCOL_PORT + "' property is not set");
+        }
+
+        try {
+            Integer.parseInt(port);
+        } catch (final NumberFormatException nfe) {
+            throw new RuntimeException("Unable to determine which port Cluster Manager Protocol is listening on because the '"
+                + NiFiProperties.CLUSTER_MANAGER_PROTOCOL_PORT + "' property is set to '" + port + "', which is not a valid port number.");
+        }
+
+        heartbeatAddress = hostname + ":" + port;
+    }
+
+    @Override
+    public void onStart() {
+        final RetryPolicy retryPolicy = new RetryForever(5000);
+        curatorClient = CuratorFrameworkFactory.newClient(zkClientConfig.getConnectString(),
+            zkClientConfig.getSessionTimeoutMillis(), zkClientConfig.getConnectionTimeoutMillis(), retryPolicy);
+        curatorClient.start();
+
+        final Thread publishAddress = new Thread(new Runnable() {
+            @Override
+            public void run() {
+                while (!isStopped()) {
+                    final String path = clusterNodesPath + "/" + COORDINATOR_ZNODE_NAME;
+                    try {
+                        try {
+                            curatorClient.setData().forPath(path, heartbeatAddress.getBytes(StandardCharsets.UTF_8));
+                            return;
+                        } catch (final NoNodeException nne) {
+                            // ensure that parents are created, using a wide-open ACL because the parents contain no data
+                            // and the path is not in any way sensitive.
+                            try {
+                                curatorClient.create().creatingParentContainersIfNeeded().forPath(path);
+                            } catch (final NodeExistsException nee) {
+                                // This is okay. Node already exists.
+                            }
+
+                            curatorClient.create().withMode(CreateMode.EPHEMERAL).forPath(path, heartbeatAddress.getBytes(StandardCharsets.UTF_8));
+                            logger.info("Successfully created node in ZooKeeper with path {}", path);
+
+                            return;
+                        }
+                    } catch (Exception e) {
+                        logger.warn("Failed to update ZooKeeper to notify nodes of the heartbeat address. Will continue to retry.");
+
+                        try {
+                            Thread.sleep(2000L);
+                        } catch (final InterruptedException ie) {
+                            Thread.currentThread().interrupt();
+                            return;
+                        }
+                    }
+                }
+            }
+        });
+
+        publishAddress.setName("Publish Heartbeat Address");
+        publishAddress.setDaemon(true);
+        publishAddress.start();
+    }
+
+    private CuratorFramework getClient() {
+        return curatorClient;
+    }
+
+    @Override
+    public void onStop() {
+        final CuratorFramework client = getClient();
+        if (client != null) {
+            client.close();
+        }
+    }
+
+    @Override
+    protected Map<NodeIdentifier, NodeHeartbeat> getLatestHeartbeats() {
+        return Collections.unmodifiableMap(heartbeatMessages);
+    }
+
+    @Override
+    public synchronized void removeHeartbeat(final NodeIdentifier nodeId) {
+        logger.debug("Deleting heartbeat for node {}", nodeId);
+        final String nodeInfoPath = clusterNodesPath + "/" + nodeId.getId();
+
+        heartbeatMessages.remove(nodeId);
+
+        try {
+            getClient().delete().forPath(nodeInfoPath);
+            logger.info("Removed heartbeat from ZooKeeper for Node {}", nodeId);
+        } catch (final NoNodeException e) {
+            // node did not exist. Just return.
+            logger.debug("Attempted to remove heartbeat for Node with ID {} but no ZNode existed at {}", nodeId, nodeInfoPath);
+            return;
+        } catch (final Exception e) {
+            logger.warn("Failed to remove heartbeat from ZooKeeper for Node {} due to {}", nodeId, e);
+            logger.warn("", e);
+
+            clusterCoordinator.reportEvent(nodeId, Severity.WARNING, "Failed to remove node's heartbeat from ZooKeeper due to " + e);
+        }
+    }
+
+    protected Set<NodeIdentifier> getClusterNodeIds() {
+        return new HashSet<>(clusterNodeIds.values());
+    }
+
+
+    @Override
+    public ProtocolMessage handle(final ProtocolMessage msg) throws ProtocolException {
+        if (msg.getType() != MessageType.HEARTBEAT) {
+            throw new ProtocolException("Cannot handle message of type " + msg.getType());
+        }
+
+        final HeartbeatMessage heartbeatMsg = (HeartbeatMessage) msg;
+        final Heartbeat heartbeat = heartbeatMsg.getHeartbeat();
+
+        final NodeIdentifier nodeId = heartbeat.getNodeIdentifier();
+        final NodeConnectionStatus connectionStatus = heartbeat.getConnectionStatus();
+        final boolean primary = heartbeat.isPrimary();
+        final byte[] payloadBytes = heartbeat.getPayload();
+        final HeartbeatPayload payload = HeartbeatPayload.unmarshal(payloadBytes);
+        final int activeThreadCount = payload.getActiveThreadCount();
+        final int flowFileCount = (int) payload.getTotalFlowFileCount();
+        final long flowFileBytes = payload.getTotalFlowFileBytes();
+        final long systemStartTime = payload.getSystemStartTime();
+
+        final NodeHeartbeat nodeHeartbeat = new StandardNodeHeartbeat(nodeId, System.currentTimeMillis(),
+            connectionStatus, primary, flowFileCount, flowFileBytes, activeThreadCount, systemStartTime);
+        heartbeatMessages.put(heartbeat.getNodeIdentifier(), nodeHeartbeat);
+        logger.debug("Received new heartbeat from {}", nodeId);
+
+        return null;
+    }
+
+    @Override
+    public boolean canHandle(ProtocolMessage msg) {
+        return msg.getType() == MessageType.HEARTBEAT;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/fb7b3fe4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/HeartbeatMonitor.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/HeartbeatMonitor.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/HeartbeatMonitor.java
new file mode 100644
index 0000000..c151382
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/HeartbeatMonitor.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.cluster.coordination.heartbeat;
+
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
+
+/**
+ * A HeartbeatMonitor is responsible for monitoring some remote resource for heartbeats from each
+ * node in a cluster and reacting to those heartbeats (or lack thereof).
+ */
+public interface HeartbeatMonitor {
+
+    /**
+     * Begin monitoring for heartbeats
+     */
+    void start();
+
+    /**
+     * Stop monitoring heartbeats
+     */
+    void stop();
+
+    /**
+     * Returns the latest heartbeat that has been obtained for the node with
+     * the given id
+     *
+     * @param nodeId the id of the node whose heartbeat should be retrieved
+     * @return the latest heartbeat that has been obtained for the node with
+     *         the given id, or <code>null</code> if no heartbeat has been obtained
+     */
+    NodeHeartbeat getLatestHeartbeat(NodeIdentifier nodeId);
+
+    /**
+     * Removes the heartbeat for the given node from the monitor and the
+     * remote location where heartbeats are sent
+     *
+     * @param nodeId the id of the node whose heartbeat should be removed
+     */
+    void removeHeartbeat(NodeIdentifier nodeId);
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/fb7b3fe4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/NodeHeartbeat.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/NodeHeartbeat.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/NodeHeartbeat.java
new file mode 100644
index 0000000..bd66022
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/NodeHeartbeat.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.cluster.coordination.heartbeat;
+
+import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus;
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
+
+public interface NodeHeartbeat {
+    /**
+     * @return the time at which the node reported the heartbeat, according
+     *         to the system that received the heartbeat
+     */
+    long getTimestamp();
+
+    /**
+     * @return the identifier of the node that sent the heartbeat
+     */
+    NodeIdentifier getNodeIdentifier();
+
+    /**
+     * @return the Connection Status reported by the node
+     */
+    NodeConnectionStatus getConnectionStatus();
+
+    /**
+     * @return <code>true</code> if the node is the Primary Node in the cluster, <code>false</code> otherwise
+     */
+    boolean isPrimary();
+
+    /**
+     * @return the number of FlowFiles that are queued up on the node
+     */
+    int getFlowFileCount();
+
+    /**
+     * @return the total size of all FlowFiles that are queued up on the node
+     */
+    long getFlowFileBytes();
+
+    /**
+     * @return the number of threads that are actively running in Processors and Reporting Tasks on the node
+     */
+    int getActiveThreadCount();
+
+    /**
+     * @return the time that the node reports having started NiFi
+     */
+    long getSystemStartTime();
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/fb7b3fe4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/StandardNodeHeartbeat.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/StandardNodeHeartbeat.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/StandardNodeHeartbeat.java
new file mode 100644
index 0000000..133bab0
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/StandardNodeHeartbeat.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.cluster.coordination.heartbeat;
+
+import org.apache.nifi.cluster.HeartbeatPayload;
+import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus;
+import org.apache.nifi.cluster.protocol.Heartbeat;
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
+import org.apache.nifi.cluster.protocol.message.HeartbeatMessage;
+
+public class StandardNodeHeartbeat implements NodeHeartbeat {
+
+    private final NodeIdentifier nodeId;
+    private final long timestamp;
+    private final NodeConnectionStatus connectionStatus;
+    private final boolean primary;
+    private final int flowFileCount;
+    private final long flowFileBytes;
+    private final int activeThreadCount;
+    private final long systemStartTime;
+
+    public StandardNodeHeartbeat(final NodeIdentifier nodeId, final long timestamp, final NodeConnectionStatus connectionStatus,
+        final boolean primary, final int flowFileCount, final long flowFileBytes, final int activeThreadCount, final long systemStartTime) {
+        this.timestamp = timestamp;
+        this.nodeId = nodeId;
+        this.connectionStatus = connectionStatus;
+        this.primary = primary;
+        this.flowFileCount = flowFileCount;
+        this.flowFileBytes = flowFileBytes;
+        this.activeThreadCount = activeThreadCount;
+        this.systemStartTime = systemStartTime;
+    }
+
+    @Override
+    public NodeIdentifier getNodeIdentifier() {
+        return nodeId;
+    }
+
+    @Override
+    public long getTimestamp() {
+        return timestamp;
+    }
+
+    @Override
+    public NodeConnectionStatus getConnectionStatus() {
+        return connectionStatus;
+    }
+
+    @Override
+    public boolean isPrimary() {
+        return primary;
+    }
+
+    @Override
+    public int getFlowFileCount() {
+        return flowFileCount;
+    }
+
+    @Override
+    public long getFlowFileBytes() {
+        return flowFileBytes;
+    }
+
+    @Override
+    public int getActiveThreadCount() {
+        return activeThreadCount;
+    }
+
+
+    @Override
+    public long getSystemStartTime() {
+        return systemStartTime;
+    }
+
+    public static StandardNodeHeartbeat fromHeartbeatMessage(final HeartbeatMessage message, final long timestamp) {
+        final Heartbeat heartbeat = message.getHeartbeat();
+        final HeartbeatPayload payload = HeartbeatPayload.unmarshal(heartbeat.getPayload());
+
+        return new StandardNodeHeartbeat(heartbeat.getNodeIdentifier(), timestamp, heartbeat.getConnectionStatus(),
+            heartbeat.isPrimary(), (int) payload.getTotalFlowFileCount(), payload.getTotalFlowFileBytes(),
+            payload.getActiveThreadCount(), payload.getSystemStartTime());
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/fb7b3fe4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/ClusterNode.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/ClusterNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/ClusterNode.java
new file mode 100644
index 0000000..d04d144
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/ClusterNode.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.cluster.coordination.node;
+
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
+
+public class ClusterNode {
+    private final NodeIdentifier nodeId;
+    private NodeConnectionStatus connectionStatus = new NodeConnectionStatus(NodeConnectionState.DISCONNECTED, DisconnectionCode.NOT_YET_CONNECTED);
+
+
+    public ClusterNode(final NodeIdentifier nodeId) {
+        this.nodeId = nodeId;
+    }
+
+    public NodeIdentifier getIdentifier() {
+        return nodeId;
+    }
+
+    public NodeConnectionStatus getConnectionStatus() {
+        return connectionStatus;
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/fb7b3fe4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ClusterManager.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ClusterManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ClusterManager.java
index 27ada88..de3c23e 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ClusterManager.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ClusterManager.java
@@ -29,7 +29,6 @@ import org.apache.nifi.cluster.node.Node;
 import org.apache.nifi.cluster.node.Node.Status;
 import org.apache.nifi.cluster.protocol.ConnectionRequest;
 import org.apache.nifi.cluster.protocol.ConnectionResponse;
-import org.apache.nifi.cluster.protocol.Heartbeat;
 import org.apache.nifi.cluster.protocol.NodeIdentifier;
 import org.apache.nifi.remote.cluster.NodeInformant;
 import org.apache.nifi.reporting.BulletinRepository;
@@ -49,14 +48,6 @@ import org.apache.nifi.reporting.BulletinRepository;
 public interface ClusterManager extends NodeInformant {
 
     /**
-     * Handles a node's heartbeat.
-     *
-     * @param heartbeat a heartbeat
-     *
-     */
-    void handleHeartbeat(Heartbeat heartbeat);
-
-    /**
      * @param statuses the statuses of the nodes
      * @return the set of nodes
      */

http://git-wip-us.apache.org/repos/asf/nifi/blob/fb7b3fe4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
index aefd307..d4ea1d4 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
@@ -33,16 +33,12 @@ import java.util.List;
 import java.util.ListIterator;
 import java.util.Map;
 import java.util.NavigableSet;
-import java.util.Queue;
 import java.util.Set;
-import java.util.Timer;
-import java.util.TimerTask;
 import java.util.TreeMap;
 import java.util.TreeSet;
 import java.util.UUID;
 import java.util.concurrent.CompletionService;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutorCompletionService;
 import java.util.concurrent.ExecutorService;
@@ -71,9 +67,11 @@ import org.apache.nifi.admin.service.AuditService;
 import org.apache.nifi.annotation.lifecycle.OnAdded;
 import org.apache.nifi.annotation.lifecycle.OnConfigurationRestored;
 import org.apache.nifi.annotation.lifecycle.OnRemoved;
-import org.apache.nifi.cluster.HeartbeatPayload;
 import org.apache.nifi.cluster.context.ClusterContext;
 import org.apache.nifi.cluster.context.ClusterContextImpl;
+import org.apache.nifi.cluster.coordination.heartbeat.ClusterProtocolHeartbeatMonitor;
+import org.apache.nifi.cluster.coordination.heartbeat.NodeHeartbeat;
+import org.apache.nifi.cluster.coordination.node.DisconnectionCode;
 import org.apache.nifi.cluster.event.Event;
 import org.apache.nifi.cluster.event.EventManager;
 import org.apache.nifi.cluster.firewall.ClusterNodeFirewall;
@@ -104,7 +102,6 @@ import org.apache.nifi.cluster.node.Node;
 import org.apache.nifi.cluster.node.Node.Status;
 import org.apache.nifi.cluster.protocol.ConnectionRequest;
 import org.apache.nifi.cluster.protocol.ConnectionResponse;
-import org.apache.nifi.cluster.protocol.Heartbeat;
 import org.apache.nifi.cluster.protocol.NodeIdentifier;
 import org.apache.nifi.cluster.protocol.ProtocolException;
 import org.apache.nifi.cluster.protocol.ProtocolHandler;
@@ -113,12 +110,9 @@ import org.apache.nifi.cluster.protocol.impl.ClusterManagerProtocolSenderListene
 import org.apache.nifi.cluster.protocol.impl.ClusterServicesBroadcaster;
 import org.apache.nifi.cluster.protocol.message.ConnectionRequestMessage;
 import org.apache.nifi.cluster.protocol.message.ConnectionResponseMessage;
-import org.apache.nifi.cluster.protocol.message.ControllerStartupFailureMessage;
 import org.apache.nifi.cluster.protocol.message.DisconnectMessage;
-import org.apache.nifi.cluster.protocol.message.HeartbeatMessage;
 import org.apache.nifi.cluster.protocol.message.ProtocolMessage;
 import org.apache.nifi.cluster.protocol.message.ProtocolMessage.MessageType;
-import org.apache.nifi.cluster.protocol.message.ReconnectionFailureMessage;
 import org.apache.nifi.cluster.protocol.message.ReconnectionRequestMessage;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.state.StateManagerProvider;
@@ -290,7 +284,6 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
     public static final String BULLETIN_CATEGORY = "Clustering";
 
     private static final Logger logger = new NiFiLog(LoggerFactory.getLogger(WebClusterManager.class));
-    private static final Logger heartbeatLogger = new NiFiLog(LoggerFactory.getLogger("org.apache.nifi.cluster.heartbeat"));
 
     /**
      * The HTTP header to store a cluster context. An example of what may be stored in the context is a node's auditable actions in response to a cluster request. The cluster context is serialized
@@ -385,10 +378,11 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
     private final ClusterManagerProtocolSenderListener senderListener;
     private final OptimisticLockingManager optimisticLockingManager;
     private final StringEncryptor encryptor;
-    private final Queue<Heartbeat> pendingHeartbeats = new ConcurrentLinkedQueue<>();
     private final ReentrantReadWriteLock resourceRWLock = new ReentrantReadWriteLock(true);
     private final ClusterManagerLock readLock = new ClusterManagerLock(resourceRWLock.readLock(), "Read");
     private final ClusterManagerLock writeLock = new ClusterManagerLock(resourceRWLock.writeLock(), "Write");
+    private final ClusterProtocolHeartbeatMonitor heartbeatMonitor;
+    private final WebClusterManagerCoordinator clusterCoordinator;
 
     private final Set<Node> nodes = new HashSet<>();
     private final ConcurrentMap<String, ReportingTaskNode> reportingTasks = new ConcurrentHashMap<>();
@@ -396,8 +390,6 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
     // null means the dataflow should be read from disk
     private StandardDataFlow cachedDataFlow = null;
     private NodeIdentifier primaryNodeId = null;
-    private Timer heartbeatMonitor;
-    private Timer heartbeatProcessor;
     private volatile ClusterServicesBroadcaster servicesBroadcaster = null;
     private volatile EventManager eventManager = null;
     private volatile ClusterNodeFirewall clusterFirewall = null;
@@ -492,6 +484,10 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
         processScheduler.setMaxThreadCount(SchedulingStrategy.CRON_DRIVEN, 10);
 
         controllerServiceProvider = new StandardControllerServiceProvider(processScheduler, bulletinRepository, stateManagerProvider);
+
+        clusterCoordinator = new WebClusterManagerCoordinator(this, senderListener);
+        heartbeatMonitor = new ClusterProtocolHeartbeatMonitor(clusterCoordinator, properties);
+        senderListener.addHandler(heartbeatMonitor);
     }
 
     public void start() throws IOException {
@@ -502,13 +498,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
             }
 
             try {
-                // setup heartbeat monitoring
-                heartbeatMonitor = new Timer("Heartbeat Monitor", /* is daemon */ true);
-                heartbeatMonitor.schedule(new HeartbeatMonitoringTimerTask(), 0, getHeartbeatMonitoringIntervalSeconds() * 1000);
-
-                heartbeatProcessor = new Timer("Process Pending Heartbeats", true);
-                final int processPendingHeartbeatDelay = 1000 * Math.max(1, getClusterProtocolHeartbeatSeconds() / 2);
-                heartbeatProcessor.schedule(new ProcessPendingHeartbeatsTask(), processPendingHeartbeatDelay, processPendingHeartbeatDelay);
+                heartbeatMonitor.start();
 
                 // start request replication service
                 httpRequestReplicator.start();
@@ -572,16 +562,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
 
             boolean encounteredException = false;
 
-            // stop the heartbeat monitoring
-            if (isHeartbeatMonitorRunning()) {
-                heartbeatMonitor.cancel();
-                heartbeatMonitor = null;
-            }
-
-            if (heartbeatProcessor != null) {
-                heartbeatProcessor.cancel();
-                heartbeatProcessor = null;
-            }
+            heartbeatMonitor.stop();
 
             // stop the HTTP request replicator service
             if (httpRequestReplicator.isRunning()) {
@@ -628,8 +609,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
     public boolean isRunning() {
         readLock.lock();
         try {
-            return isHeartbeatMonitorRunning()
-                    || httpRequestReplicator.isRunning()
+            return httpRequestReplicator.isRunning()
                     || senderListener.isRunning()
                     || dataFlowManagementService.isRunning()
                     || isBroadcasting();
@@ -640,10 +620,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
 
     @Override
     public boolean canHandle(ProtocolMessage msg) {
-        return MessageType.CONNECTION_REQUEST == msg.getType()
-                || MessageType.HEARTBEAT == msg.getType()
-                || MessageType.CONTROLLER_STARTUP_FAILURE == msg.getType()
-                || MessageType.RECONNECTION_FAILURE == msg.getType();
+        return MessageType.CONNECTION_REQUEST == msg.getType();
     }
 
     @Override
@@ -651,31 +628,6 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
         switch (protocolMessage.getType()) {
             case CONNECTION_REQUEST:
                 return handleConnectionRequest((ConnectionRequestMessage) protocolMessage);
-            case HEARTBEAT:
-                final HeartbeatMessage heartbeatMessage = (HeartbeatMessage) protocolMessage;
-
-                final Heartbeat original = heartbeatMessage.getHeartbeat();
-                final NodeIdentifier originalNodeId = original.getNodeIdentifier();
-                final Heartbeat heartbeatWithDn = new Heartbeat(addRequestorDn(originalNodeId, heartbeatMessage.getRequestorDN()), original.isPrimary(), original.isConnected(), original.getPayload());
-
-                handleHeartbeat(heartbeatWithDn);
-                return null;
-            case CONTROLLER_STARTUP_FAILURE:
-                new Thread(new Runnable() {
-                    @Override
-                    public void run() {
-                        handleControllerStartupFailure((ControllerStartupFailureMessage) protocolMessage);
-                    }
-                }, "Handle Controller Startup Failure Message from " + ((ControllerStartupFailureMessage) protocolMessage).getNodeId()).start();
-                return null;
-            case RECONNECTION_FAILURE:
-                new Thread(new Runnable() {
-                    @Override
-                    public void run() {
-                        handleReconnectionFailure((ReconnectionFailureMessage) protocolMessage);
-                    }
-                }, "Handle Reconnection Failure Message from " + ((ReconnectionFailureMessage) protocolMessage).getNodeId()).start();
-                return null;
             default:
                 throw new ProtocolException("No handler defined for message type: " + protocolMessage.getType());
         }
@@ -758,16 +710,13 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
                 addEvent(node.getNodeId(), "Connection requested from new node.  Setting status to connecting.");
                 nodes.add(node);
             } else {
-                node.setStatus(Status.CONNECTING);
+                clusterCoordinator.updateNodeStatus(node, Status.CONNECTING);
                 addEvent(resolvedNodeIdentifier, "Connection requested from existing node.  Setting status to connecting");
             }
 
             // record the time of the connection request
             node.setConnectionRequestedTimestamp(new Date().getTime());
 
-            // clear out old heartbeat info
-            node.setHeartbeat(null);
-
             // try to obtain a current flow
             if (dataFlowManagementService.isFlowCurrent()) {
                 // if a cached copy does not exist, load it from disk
@@ -777,17 +726,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
                     primaryNodeId = clusterDataFlow.getPrimaryNodeId();
                 }
 
-                // determine if this node should be assigned the primary role
-                final boolean primaryRole;
-                if (primaryNodeId == null || primaryNodeId.logicallyEquals(node.getNodeId())) {
-                    setPrimaryNodeId(node.getNodeId());
-                    addEvent(node.getNodeId(), "Setting primary role in connection response.");
-                    primaryRole = true;
-                } else {
-                    primaryRole = false;
-                }
-
-                return new ConnectionResponse(node.getNodeId(), cachedDataFlow, primaryRole, remoteInputPort, remoteCommsSecure, instanceId);
+                return new ConnectionResponse(node.getNodeId(), cachedDataFlow, remoteInputPort, remoteCommsSecure, instanceId);
             }
 
             /*
@@ -848,9 +787,6 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
                 throw new IllegalNodeReconnectionException("Node must be disconnected before it can reconnect.");
             }
 
-            // clear out old heartbeat info
-            node.setHeartbeat(null);
-
             // get the dataflow to send with the reconnection request
             if (!dataFlowManagementService.isFlowCurrent()) {
                 /* node remains disconnected */
@@ -867,7 +803,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
                 primaryNodeId = clusterDataFlow.getPrimaryNodeId();
             }
 
-            node.setStatus(Status.CONNECTING);
+            clusterCoordinator.updateNodeStatus(node, Status.CONNECTING);
             addEvent(node.getNodeId(), "Reconnection requested for node.  Setting status to connecting.");
 
             // determine if this node should be assigned the primary role
@@ -889,7 +825,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
         } catch (final Exception ex) {
             logger.warn("Problem encountered issuing reconnection request to node " + node.getNodeId() + " due to: " + ex, ex);
 
-            node.setStatus(Status.DISCONNECTED);
+            clusterCoordinator.updateNodeStatus(node, Status.DISCONNECTED);
             final String eventMsg = "Problem encountered issuing reconnection request. Node will remain disconnected: " + ex;
             addEvent(node.getNodeId(), eventMsg);
             addBulletin(node, Severity.WARNING, eventMsg);
@@ -1214,7 +1150,8 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
             if (node == null) {
                 throw new UnknownNodeException("Node does not exist.");
             }
-            requestDisconnection(node.getNodeId(), /* ignore last node */ false, "User " + userDn + " Disconnected Node");
+
+            clusterCoordinator.requestNodeDisconnect(node.getNodeId(), DisconnectionCode.USER_DISCONNECTED, "User " + userDn + " Disconnected Node");
         } finally {
             writeLock.unlock("requestDisconnection(String)");
         }
@@ -1246,7 +1183,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
      * true.
      * @throws NodeDisconnectionException if the disconnection message fails to be sent.
      */
-    private void requestDisconnection(final NodeIdentifier nodeId, final boolean ignoreNodeChecks, final String explanation)
+    void requestDisconnection(final NodeIdentifier nodeId, final boolean ignoreNodeChecks, final String explanation)
             throws IllegalNodeDisconnectionException, NodeDisconnectionException {
 
         writeLock.lock();
@@ -1277,14 +1214,11 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
                 // cannot disconnect the last connected node in the cluster
                 if (connectedNodes.size() == 1 && connectedNodes.iterator().next().equals(nodeId)) {
                     throw new IllegalNodeDisconnectionException("Node may not be disconnected because it is the only connected node in the cluster.");
-                } else if (isPrimaryNode(nodeId)) {
-                    // cannot disconnect the primary node in the cluster
-                    throw new IllegalNodeDisconnectionException("Node may not be disconnected because it is the primary node in the cluster.");
                 }
             }
 
             // update status
-            node.setStatus(Status.DISCONNECTED);
+            clusterCoordinator.updateNodeStatus(node, Status.DISCONNECTED);
             notifyDataFlowManagementServiceOfNodeStatusChange();
 
             // issue the disconnection
@@ -1296,6 +1230,8 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
             senderListener.disconnect(request);
             addEvent(nodeId, "Node disconnected due to " + explanation);
             addBulletin(node, Severity.INFO, "Node disconnected due to " + explanation);
+
+            heartbeatMonitor.removeHeartbeat(nodeId);
         } finally {
             writeLock.unlock("requestDisconnection(NodeIdentifier, boolean)");
         }
@@ -1318,36 +1254,6 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
         return responseMessage;
     }
 
-    private void handleControllerStartupFailure(final ControllerStartupFailureMessage msg) {
-        writeLock.lock();
-        try {
-            final Node node = getRawNode(msg.getNodeId().getId());
-            if (node != null) {
-                node.setStatus(Status.DISCONNECTED);
-                addEvent(msg.getNodeId(), "Node could not join cluster because it failed to start up properly. Setting node to Disconnected. Node reported "
-                        + "the following error: " + msg.getExceptionMessage());
-                addBulletin(node, Severity.ERROR, "Node could not join cluster because it failed to start up properly. Setting node to Disconnected. Node "
-                        + "reported the following error: " + msg.getExceptionMessage());
-            }
-        } finally {
-            writeLock.unlock("handleControllerStartupFailure");
-        }
-    }
-
-    private void handleReconnectionFailure(final ReconnectionFailureMessage msg) {
-        writeLock.lock();
-        try {
-            final Node node = getRawNode(msg.getNodeId().getId());
-            if (node != null) {
-                node.setStatus(Status.DISCONNECTED);
-                final String errorMsg = "Node could not rejoin cluster. Setting node to Disconnected. Node reported the following error: " + msg.getExceptionMessage();
-                addEvent(msg.getNodeId(), errorMsg);
-                addBulletin(node, Severity.ERROR, errorMsg);
-            }
-        } finally {
-            writeLock.unlock("handleControllerStartupFailure");
-        }
-    }
 
     @Override
     public ControllerServiceNode createControllerService(final String type, final String id, final boolean firstTimeAdded) {
@@ -1623,176 +1529,6 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
     }
 
 
-    /**
-     * Handles a node's heartbeat. If this heartbeat is a node's first heartbeat since its connection request, then the manager will mark the node as connected. If the node was previously disconnected
-     * due to a lack of heartbeat, then a reconnection request is issued. If the node was disconnected for other reasons, then a disconnection request is issued. If this instance is configured with a
-     * firewall and the heartbeat is blocked, then a disconnection request is issued.
-     */
-    @Override
-    public void handleHeartbeat(final Heartbeat heartbeat) {
-        // sanity check heartbeat
-        if (heartbeat == null) {
-            throw new IllegalArgumentException("Heartbeat may not be null.");
-        } else if (heartbeat.getNodeIdentifier() == null) {
-            throw new IllegalArgumentException("Heartbeat does not contain a node ID.");
-        }
-
-        /*
-         * Processing a heartbeat requires a write lock, which may take a while
-         * to obtain.  Only the last heartbeat is necessary to process per node.
-         * Futhermore, since many could pile up, heartbeats are processed in
-         * bulk.
-         * The below queue stores the pending heartbeats.
-         */
-        pendingHeartbeats.add(heartbeat);
-    }
-
-    private void processPendingHeartbeats() {
-        Node node;
-
-        writeLock.lock();
-        try {
-            /*
-             * Get the most recent heartbeats for the nodes in the cluster.  This
-             * is achieved by "draining" the pending heartbeats queue, populating
-             * a map that associates a node identifier with its latest heartbeat, and
-             * finally, getting the values of the map.
-             */
-            final Map<NodeIdentifier, Heartbeat> mostRecentHeartbeatsMap = new HashMap<>();
-            Heartbeat aHeartbeat;
-            while ((aHeartbeat = pendingHeartbeats.poll()) != null) {
-                mostRecentHeartbeatsMap.put(aHeartbeat.getNodeIdentifier(), aHeartbeat);
-            }
-            final Collection<Heartbeat> mostRecentHeartbeats = new ArrayList<>(mostRecentHeartbeatsMap.values());
-
-            // return fast if no work to do
-            if (mostRecentHeartbeats.isEmpty()) {
-                return;
-            }
-
-            logNodes("Before Heartbeat Processing", heartbeatLogger);
-
-            final int numPendingHeartbeats = mostRecentHeartbeats.size();
-            if (heartbeatLogger.isDebugEnabled()) {
-                heartbeatLogger.debug(String.format("Handling %s heartbeat%s", numPendingHeartbeats, numPendingHeartbeats > 1 ? "s" : ""));
-            }
-
-            for (final Heartbeat mostRecentHeartbeat : mostRecentHeartbeats) {
-                try {
-                    // resolve the proposed node identifier to valid node identifier
-                    final NodeIdentifier resolvedNodeIdentifier = resolveProposedNodeIdentifier(mostRecentHeartbeat.getNodeIdentifier());
-
-                    // get a raw reference to the node (if it doesn't exist, node will be null)
-                    node = getRawNode(resolvedNodeIdentifier.getId());
-
-                    final boolean heartbeatIndicatesNotYetConnected = !mostRecentHeartbeat.isConnected();
-
-                    if (isBlockedByFirewall(resolvedNodeIdentifier.getSocketAddress())) {
-                        if (node == null) {
-                            logger.info("Firewall blocked heartbeat received from unknown node " + resolvedNodeIdentifier + ".  Issuing disconnection request.");
-                        } else {
-                            // record event
-                            addEvent(resolvedNodeIdentifier, "Firewall blocked received heartbeat.  Issuing disconnection request.");
-                        }
-
-                        // request node to disconnect
-                        requestDisconnectionQuietly(resolvedNodeIdentifier, "Blocked By Firewall");
-
-                    } else if (node == null) {  // unknown node, so issue reconnect request
-                        // create new node and add to node set
-                        final Node newNode = new Node(resolvedNodeIdentifier, Status.DISCONNECTED);
-                        nodes.add(newNode);
-
-                        // record event
-                        addEvent(newNode.getNodeId(), "Received heartbeat from unknown node.  Issuing reconnection request.");
-
-                        // record heartbeat
-                        newNode.setHeartbeat(mostRecentHeartbeat);
-                        requestReconnection(resolvedNodeIdentifier.getId(), "NCM Heartbeat Processing");
-                    } else if (heartbeatIndicatesNotYetConnected) {
-                        if (Status.CONNECTED == node.getStatus()) {
-                            // record event
-                            addEvent(node.getNodeId(), "Received heartbeat from node that thinks it is not yet part of the cluster, though the Manager thought it "
-                                    + "was. Marking as Disconnected and issuing reconnection request.");
-
-                            // record heartbeat
-                            node.setHeartbeat(null);
-                            node.setStatus(Status.DISCONNECTED);
-
-                            requestReconnection(resolvedNodeIdentifier.getId(), "NCM Heartbeat Processing");
-                        }
-                    } else if (Status.DISCONNECTED == node.getStatus()) {
-                        // ignore heartbeats from nodes disconnected by means other than lack of heartbeat, unless it is
-                        // the only node. We allow it if it is the only node because if we have a one-node cluster, then
-                        // we cannot manually reconnect it.
-                        if (node.isHeartbeatDisconnection() || nodes.size() == 1) {
-                            // record event
-                            if (node.isHeartbeatDisconnection()) {
-                                addEvent(resolvedNodeIdentifier, "Received heartbeat from node previously disconnected due to lack of heartbeat.  Issuing reconnection request.");
-                            } else {
-                                addEvent(resolvedNodeIdentifier, "Received heartbeat from node previously disconnected, but it is the only known node, so issuing reconnection request.");
-                            }
-
-                            // record heartbeat
-                            node.setHeartbeat(mostRecentHeartbeat);
-
-                            // request reconnection
-                            requestReconnection(resolvedNodeIdentifier.getId(), "NCM Heartbeat Processing");
-                        } else {
-                            // disconnected nodes should not heartbeat, so we need to issue a disconnection request
-                            heartbeatLogger.info("Ignoring received heartbeat from disconnected node " + resolvedNodeIdentifier + ".  Issuing disconnection request.");
-
-                            // request node to disconnect
-                            requestDisconnectionQuietly(resolvedNodeIdentifier, "Received Heartbeat from Node, but Manager has already marked Node as Disconnected");
-                        }
-
-                    } else if (Status.DISCONNECTING == node.getStatus()) {
-                        /* ignore spurious heartbeat */
-                    } else {  // node is either either connected or connecting
-                        // first heartbeat causes status change from connecting to connected
-                        if (Status.CONNECTING == node.getStatus()) {
-                            if (mostRecentHeartbeat.getCreatedTimestamp() < node.getConnectionRequestedTimestamp()) {
-                                heartbeatLogger.info("Received heartbeat for node " + resolvedNodeIdentifier + " but ignoring because it was generated before the node was last asked to reconnect.");
-                                continue;
-                            }
-
-                            // set status to connected
-                            node.setStatus(Status.CONNECTED);
-
-                            // record event
-                            addEvent(resolvedNodeIdentifier, "Received first heartbeat from connecting node.  Setting node to connected.");
-
-                            // notify service of updated node set
-                            notifyDataFlowManagementServiceOfNodeStatusChange();
-
-                            addBulletin(node, Severity.INFO, "Node Connected");
-                        } else {
-                            heartbeatLogger.info("Received heartbeat for node " + resolvedNodeIdentifier + ".");
-                        }
-
-                        // record heartbeat
-                        node.setHeartbeat(mostRecentHeartbeat);
-
-                        if (mostRecentHeartbeat.isPrimary()) {
-                            setPrimaryNodeId(node.getNodeId());
-                        }
-                    }
-                } catch (final Exception e) {
-                    logger.error("Failed to process heartbeat from {}:{} due to {}",
-                            mostRecentHeartbeat.getNodeIdentifier().getApiAddress(), mostRecentHeartbeat.getNodeIdentifier().getApiPort(), e.toString());
-                    if (logger.isDebugEnabled()) {
-                        logger.error("", e);
-                    }
-                }
-            }
-
-            logNodes("After Heartbeat Processing", heartbeatLogger);
-        } finally {
-            writeLock.unlock("processPendingHeartbeats");
-        }
-    }
-
-
     @Override
     public Set<Node> getNodes(final Status... statuses) {
         final Set<Status> desiredStatusSet = new HashSet<>();
@@ -2125,15 +1861,6 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
         }
     }
 
-    private boolean isPrimaryNode(final NodeIdentifier nodeId) {
-        readLock.lock();
-        try {
-            return primaryNodeId != null && primaryNodeId.equals(nodeId);
-        } finally {
-            readLock.unlock("isPrimaryNode");
-        }
-    }
-
     private boolean isInSafeMode() {
         readLock.lock();
         try {
@@ -2143,7 +1870,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
         }
     }
 
-    private void setPrimaryNodeId(final NodeIdentifier primaryNodeId) throws DaoException {
+    void setPrimaryNodeId(final NodeIdentifier primaryNodeId) throws DaoException {
         writeLock.lock();
         try {
             dataFlowManagementService.updatePrimaryNode(primaryNodeId);
@@ -3321,7 +3048,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
             // create new "updated" node by cloning old node and updating status
             final Node currentNode = getRawNode(nodeResponse.getNodeId().getId());
             final Node updatedNode = currentNode.clone();
-            updatedNode.setStatus(nodeStatus);
+            clusterCoordinator.updateNodeStatus(updatedNode, nodeStatus);
 
             // map updated node to its response
             updatedNodesMap.put(updatedNode, nodeResponse);
@@ -4041,7 +3768,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
                     final Node node = updatedNodeEntry.getKey();
 
                     if (problematicNodeResponses.contains(nodeResponse)) {
-                        node.setStatus(Status.CONNECTED);
+                        clusterCoordinator.updateNodeStatus(node, Status.CONNECTED);
                         problematicNodeResponses.remove(nodeResponse);
                     }
                 }
@@ -4227,7 +3954,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
      *
      * @return false if the IP is listed in the firewall or if the firewall is not configured; true otherwise
      */
-    private boolean isBlockedByFirewall(final String ip) {
+    boolean isBlockedByFirewall(final String ip) {
         if (isFirewallConfigured()) {
             return !clusterFirewall.isPermissible(ip);
         } else {
@@ -4257,7 +3984,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
         }
     }
 
-    private Node getRawNode(final String nodeId) {
+    Node getRawNode(final String nodeId) {
         readLock.lock();
         try {
             for (final Node node : nodes) {
@@ -4320,16 +4047,6 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
         }
     }
 
-
-    private boolean isHeartbeatMonitorRunning() {
-        readLock.lock();
-        try {
-            return heartbeatMonitor != null;
-        } finally {
-            readLock.unlock("isHeartbeatMonitorRunning");
-        }
-    }
-
     private boolean canChangeNodeState(final String method, final URI uri) {
         return HttpMethod.DELETE.equalsIgnoreCase(method) || HttpMethod.POST.equalsIgnoreCase(method) || HttpMethod.PUT.equalsIgnoreCase(method);
     }
@@ -4359,87 +4076,8 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
         }
     }
 
-    private void logNodes(final String header, final Logger logger) {
-        if (logger.isTraceEnabled()) {
-            if (StringUtils.isNotBlank(header)) {
-                logger.trace(header);
-            }
-            for (final Node node : getNodes()) {
-                logger.trace(node.getNodeId() + " : " + node.getStatus());
-            }
-        }
-    }
-
-
-    /**
-     * This timer task simply processes any pending heartbeats. This timer task is not strictly needed, as HeartbeatMonitoringTimerTask will do this. However, this task is scheduled much more
-     * frequently and by processing the heartbeats more frequently, the stats that we report have less of a delay.
-     */
-    private class ProcessPendingHeartbeatsTask extends TimerTask {
-
-        @Override
-        public void run() {
-            writeLock.lock();
-            try {
-                processPendingHeartbeats();
-            } finally {
-                writeLock.unlock("Process Pending Heartbeats Task");
-            }
-        }
-    }
-
-    /**
-     * A timer task to detect nodes that have not sent a heartbeat in a while. The "problem" nodes are marked as disconnected due to lack of heartbeat by the task. No disconnection request is sent to
-     * the node. This is because either the node is not functioning in which case sending the request is futile or the node is running a bit slow. In the latter case, we'll wait for the next heartbeat
-     * and send a reconnection request when we process the heartbeat in the heartbeatHandler() method.
-     */
-    private class HeartbeatMonitoringTimerTask extends TimerTask {
-
-        @Override
-        public void run() {
-            // keep track of any status changes
-            boolean statusChanged = false;
-
-            writeLock.lock();
-            try {
-                // process all of the heartbeats before we decided to kick anyone out of the cluster.
-                logger.debug("Processing pending heartbeats...");
-                processPendingHeartbeats();
-
-                logger.debug("Executing heartbeat monitoring");
-
-                // check for any nodes that have not heartbeated in a long time
-                for (final Node node : getRawNodes(Status.CONNECTED)) {
-                    // return prematurely if we were interrupted
-                    if (Thread.currentThread().isInterrupted()) {
-                        return;
-                    }
-
-                    // check if we received a recent heartbeat, changing status to disconnected if necessary
-                    final long lastHeardTimestamp = node.getHeartbeat().getCreatedTimestamp();
-                    final int heartbeatGapSeconds = (int) (new Date().getTime() - lastHeardTimestamp) / 1000;
-                    if (heartbeatGapSeconds > getMaxHeartbeatGapSeconds()) {
-                        node.setHeartbeatDisconnection();
-                        addEvent(node.getNodeId(), "Node disconnected due to lack of heartbeat.");
-                        addBulletin(node, Severity.WARNING, "Node disconnected due to lack of heartbeat");
-                        statusChanged = true;
-                    }
-                }
-
-                // if a status change occurred, make the necessary updates
-                if (statusChanged) {
-                    logNodes("Heartbeat Monitoring disconnected node(s)", logger);
-                    // notify service of updated node set
-                    notifyDataFlowManagementServiceOfNodeStatusChange();
-                } else {
-                    logNodes("Heartbeat Monitoring determined all nodes are healthy", logger);
-                }
-            } catch (final Exception ex) {
-                logger.warn("Heartbeat monitor experienced exception while monitoring: " + ex, ex);
-            } finally {
-                writeLock.unlock("HeartbeatMonitoringTimerTask");
-            }
-        }
+    public NodeHeartbeat getLatestHeartbeat(final NodeIdentifier nodeId) {
+        return heartbeatMonitor.getLatestHeartbeat(nodeId);
     }
 
     @Override
@@ -4449,16 +4087,14 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
             final Collection<NodeInformation> nodeInfos = new ArrayList<>();
             for (final Node node : getRawNodes(Status.CONNECTED)) {
                 final NodeIdentifier id = node.getNodeId();
-                final HeartbeatPayload heartbeat = node.getHeartbeatPayload();
-                if (heartbeat == null) {
-                    continue;
-                }
 
                 final Integer siteToSitePort = id.getSiteToSitePort();
                 if (siteToSitePort == null) {
                     continue;
                 }
-                final int flowFileCount = (int) heartbeat.getTotalFlowFileCount();
+
+                final NodeHeartbeat nodeHeartbeat = heartbeatMonitor.getLatestHeartbeat(id);
+                final int flowFileCount = nodeHeartbeat == null ? 0 : nodeHeartbeat.getFlowFileCount();
                 final NodeInformation nodeInfo = new NodeInformation(id.getSiteToSiteAddress(), siteToSitePort, id.getApiPort(),
                     id.isSiteToSiteSecure(), flowFileCount);
                 nodeInfos.add(nodeInfo);
@@ -4626,4 +4262,11 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
     public Set<String> getControllerServiceIdentifiers(final Class<? extends ControllerService> serviceType) {
         return controllerServiceProvider.getControllerServiceIdentifiers(serviceType);
     }
+
+    public void reportEvent(final NodeIdentifier nodeId, final Severity severity, final String message) {
+        bulletinRepository.addBulletin(BulletinFactory.createBulletin(nodeId == null ? "Cluster" : nodeId.getId(), severity.name(), message));
+        if (nodeId != null) {
+            addEvent(nodeId, message);
+        }
+    }
 }