You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by mc...@apache.org on 2016/04/22 21:01:48 UTC

[2/4] nifi git commit: NIFI-1678: Started refactoring heartbeating mechanism, using a new package: org.apache.nifi.cluster.coordination

http://git-wip-us.apache.org/repos/asf/nifi/blob/fb7b3fe4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManagerCoordinator.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManagerCoordinator.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManagerCoordinator.java
new file mode 100644
index 0000000..fc75601
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManagerCoordinator.java
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.cluster.manager.impl;
+
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
+
+import org.apache.nifi.cluster.coordination.ClusterCoordinator;
+import org.apache.nifi.cluster.coordination.node.DisconnectionCode;
+import org.apache.nifi.cluster.coordination.node.NodeConnectionState;
+import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus;
+import org.apache.nifi.cluster.node.Node;
+import org.apache.nifi.cluster.node.Node.Status;
+import org.apache.nifi.cluster.protocol.ClusterManagerProtocolSender;
+import org.apache.nifi.cluster.protocol.ConnectionRequest;
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
+import org.apache.nifi.cluster.protocol.message.NodeStatusChangeMessage;
+import org.apache.nifi.reporting.Severity;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class WebClusterManagerCoordinator implements ClusterCoordinator {
+    private static final Logger logger = LoggerFactory.getLogger(WebClusterManagerCoordinator.class);
+    private static final AtomicLong nodeStatusIdGenerator = new AtomicLong(0L);
+
+    private final WebClusterManager clusterManager;
+    private final ClusterManagerProtocolSender protocolSender;
+
+    public WebClusterManagerCoordinator(final WebClusterManager clusterManager, final ClusterManagerProtocolSender protocolSender) {
+        this.clusterManager = clusterManager;
+        this.protocolSender = protocolSender;
+    }
+
+    @Override
+    public void requestNodeConnect(final NodeIdentifier nodeId) {
+        final Node node = clusterManager.getRawNode(nodeId.getId());
+
+        if (node == null) {
+            final ConnectionRequest connectionRequest = new ConnectionRequest(nodeId);
+            clusterManager.requestConnection(connectionRequest);
+        } else {
+            updateNodeStatus(nodeId, new NodeConnectionStatus(DisconnectionCode.NOT_YET_CONNECTED, "Requesting that Node Connect to the Cluster"));
+            clusterManager.requestReconnection(nodeId.getId(), "Anonymous");
+        }
+    }
+
+    @Override
+    public void finishNodeConnection(final NodeIdentifier nodeId) {
+        final boolean updated = updateNodeStatus(nodeId, new NodeConnectionStatus(NodeConnectionState.CONNECTED));
+        if (!updated) {
+            logger.error("Attempting to Finish Node Connection but could not find Node with Identifier {}", nodeId);
+        }
+    }
+
+    @Override
+    public void requestNodeDisconnect(final NodeIdentifier nodeId, final DisconnectionCode disconnectionCode, final String explanation) {
+        try {
+            clusterManager.requestDisconnection(nodeId, false, explanation);
+
+            if (disconnectionCode == DisconnectionCode.LACK_OF_HEARTBEAT) {
+                final Node node = clusterManager.getRawNode(nodeId.getId());
+                if (node != null) {
+                    updateNodeStatus(node, Status.DISCONNECTED, true);
+                }
+            }
+        } catch (final Exception e) {
+            logger.error("Failed to request node {} disconnect from cluster due to {}", nodeId, e);
+            logger.error("", e);
+        }
+    }
+
+    @Override
+    public void disconnectionRequestedByNode(final NodeIdentifier nodeId, final DisconnectionCode disconnectionCode, final String explanation) {
+        updateNodeStatus(nodeId, new NodeConnectionStatus(disconnectionCode, explanation));
+
+        final Severity severity;
+        switch (disconnectionCode) {
+            case STARTUP_FAILURE:
+            case MISMATCHED_FLOWS:
+            case UNKNOWN:
+                severity = Severity.ERROR;
+                break;
+            default:
+                severity = Severity.INFO;
+                break;
+        }
+
+        reportEvent(nodeId, severity, "Node disconnected from cluster due to " + explanation);
+    }
+
+    @Override
+    public NodeConnectionStatus getConnectionStatus(final NodeIdentifier nodeId) {
+        final Node node = clusterManager.getNode(nodeId.getId());
+        if (node == null) {
+            return null;
+        }
+
+        final Status status = node.getStatus();
+        final NodeConnectionState connectionState = NodeConnectionState.valueOf(status.name());
+        return new NodeConnectionStatus(connectionState, node.getConnectionRequestedTimestamp());
+    }
+
+    @Override
+    public Set<NodeIdentifier> getNodeIdentifiers(final NodeConnectionState state) {
+        final Status status = Status.valueOf(state.name());
+        final Set<Node> nodes = clusterManager.getNodes(status);
+        return nodes.stream()
+            .map(node -> node.getNodeId())
+            .collect(Collectors.toSet());
+    }
+
+    @Override
+    public boolean isBlockedByFirewall(final String hostname) {
+        return clusterManager.isBlockedByFirewall(hostname);
+    }
+
+    @Override
+    public void reportEvent(final NodeIdentifier nodeId, final Severity severity, final String event) {
+        final String messagePrefix = nodeId == null ? "" : nodeId.getApiAddress() + ":" + nodeId.getApiPort() + " -- ";
+        switch (severity) {
+            case INFO:
+                logger.info(messagePrefix + event);
+                break;
+            case WARNING:
+                logger.warn(messagePrefix + event);
+                break;
+            case ERROR:
+                logger.error(messagePrefix + event);
+                break;
+        }
+
+        clusterManager.reportEvent(nodeId, severity, messagePrefix + event);
+    }
+
+    @Override
+    public void setPrimaryNode(final NodeIdentifier nodeId) {
+        clusterManager.setPrimaryNodeId(nodeId);
+    }
+
+    @Override
+    public NodeIdentifier getNodeIdentifier(final String uuid) {
+        final Node node = clusterManager.getNode(uuid);
+        return node == null ? null : node.getNodeId();
+    }
+
+
+    /**
+     * Updates the status of the node with the given ID to the given status and returns <code>true</code>
+     * if successful, <code>false</code> if no node exists with the given ID
+     *
+     * @param nodeId the ID of the node whose status is changed
+     * @param status the new status of the node
+     * @return <code>true</code> if the node exists and is updated, <code>false</code> if the node does not exist
+     */
+    private boolean updateNodeStatus(final NodeIdentifier nodeId, final NodeConnectionStatus status) {
+        final long statusUpdateId = nodeStatusIdGenerator.incrementAndGet();
+
+        final Node node = clusterManager.getRawNode(nodeId.getId());
+        if (node == null) {
+            return false;
+        }
+
+        final Status nodeStatus = Status.valueOf(status.getState().name());
+        final Status oldStatus = node.setStatus(nodeStatus);
+
+        if (nodeStatus != oldStatus) {
+            final Set<NodeIdentifier> nodesToNotify = clusterManager.getNodes(Status.CONNECTED, Status.CONNECTING).stream()
+                .map(toNotify -> toNotify.getNodeId())
+                .collect(Collectors.toSet());
+
+            final NodeStatusChangeMessage message = new NodeStatusChangeMessage();
+            message.setNodeId(nodeId);
+            message.setNodeConnectionStatus(status);
+            // TODO: When this is sent from one node to another, we need to ensure that we check the current
+            // 'revision number' on the node and include that as the Update ID because we need a way to indicate
+            // which status change event occurred first. I.e., when the status of a node is updated on any node
+            // that is not the elected leader, we need to ensure that our nodeStatusIdGenerator also is updated.
+            message.setStatusUpdateIdentifier(statusUpdateId);
+
+            protocolSender.notifyNodeStatusChange(nodesToNotify, message);
+        }
+
+        return true;
+    }
+
+    /**
+     * Updates the status of the given node to the given new status. This method exists only because the NCM currently handles
+     * some of the status changing and we want it to call into this coordinator instead to change the status.
+     *
+     * @param rawNode the node whose status should be updated
+     * @param nodeStatus the new status of the node
+     */
+    void updateNodeStatus(final Node rawNode, final Status nodeStatus) {
+        // TODO: Remove this method when NCM is removed
+        updateNodeStatus(rawNode, nodeStatus, false);
+    }
+
+
+    /**
+     * Updates the status of the given node to the given new status. This method exists only because the NCM currently handles
+     * some of the status changing and we want it to call into this coordinator instead to change the status.
+     *
+     * @param rawNode the node whose status should be updated
+     * @param nodeStatus the new status of the node
+     * @param heartbeatDisconnect indicates whether or not the node is being disconnected due to lack of heartbeat
+     */
+    void updateNodeStatus(final Node rawNode, final Status nodeStatus, final boolean heartbeatDisconnect) {
+        // TODO: Remove this method when NCM is removed.
+        final long statusUpdateId = nodeStatusIdGenerator.incrementAndGet();
+        final Status oldStatus;
+        if (heartbeatDisconnect) {
+            oldStatus = rawNode.setHeartbeatDisconnection();
+        } else {
+            oldStatus = rawNode.setStatus(nodeStatus);
+        }
+
+        if (nodeStatus != oldStatus) {
+            final Set<NodeIdentifier> nodesToNotify = clusterManager.getNodes(Status.CONNECTED, Status.CONNECTING).stream()
+                .map(toNotify -> toNotify.getNodeId())
+                .collect(Collectors.toSet());
+
+            final NodeStatusChangeMessage message = new NodeStatusChangeMessage();
+            message.setNodeId(rawNode.getNodeId());
+            message.setNodeConnectionStatus(new NodeConnectionStatus(NodeConnectionState.valueOf(nodeStatus.name())));
+            message.setStatusUpdateIdentifier(statusUpdateId);
+
+            protocolSender.notifyNodeStatusChange(nodesToNotify, message);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/fb7b3fe4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/node/Node.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/node/Node.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/node/Node.java
index 3bb3c1a..6f71834 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/node/Node.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/node/Node.java
@@ -18,12 +18,7 @@ package org.apache.nifi.cluster.node;
 
 import java.util.concurrent.atomic.AtomicLong;
 
-import org.apache.nifi.cluster.HeartbeatPayload;
-import org.apache.nifi.cluster.protocol.Heartbeat;
 import org.apache.nifi.cluster.protocol.NodeIdentifier;
-import org.apache.nifi.cluster.protocol.ProtocolException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  * Represents a connected flow controller. Nodes always have an immutable identifier and a status. The status may be changed, but never null.
@@ -35,8 +30,6 @@ import org.slf4j.LoggerFactory;
  */
 public class Node implements Cloneable, Comparable<Node> {
 
-    private static final Logger lockLogger = LoggerFactory.getLogger("cluster.lock");
-
     /**
      * The semantics of a Node status are as follows:
      * <ul>
@@ -66,15 +59,6 @@ public class Node implements Cloneable, Comparable<Node> {
      */
     private Status status;
 
-    /**
-     * the last heartbeat received by from the node
-     */
-    private Heartbeat lastHeartbeat;
-
-    /**
-     * the payload of the last heartbeat received from the node
-     */
-    private HeartbeatPayload lastHeartbeatPayload;
 
     /**
      * the last time the connection for this node was requested
@@ -101,40 +85,6 @@ public class Node implements Cloneable, Comparable<Node> {
     }
 
     /**
-     * Returns the last received heartbeat or null if no heartbeat has been set.
-     *
-     * @return a heartbeat or null
-     */
-    public Heartbeat getHeartbeat() {
-        return lastHeartbeat;
-    }
-
-    public HeartbeatPayload getHeartbeatPayload() {
-        return lastHeartbeatPayload;
-    }
-
-    /**
-     * Sets the last heartbeat received.
-     *
-     * @param heartbeat a heartbeat
-     *
-     * @throws ProtocolException if the heartbeat's payload failed unmarshalling
-     */
-    public void setHeartbeat(final Heartbeat heartbeat) throws ProtocolException {
-        this.lastHeartbeat = heartbeat;
-        if (this.lastHeartbeat == null) {
-            this.lastHeartbeatPayload = null;
-        } else {
-            final byte[] payload = lastHeartbeat.getPayload();
-            if (payload == null || payload.length == 0) {
-                this.lastHeartbeatPayload = null;
-            } else {
-                this.lastHeartbeatPayload = HeartbeatPayload.unmarshal(payload);
-            }
-        }
-    }
-
-    /**
      * Returns the time of the last received connection request for this node.
      *
      * @return the time when the connection request for this node was received.
@@ -166,34 +116,38 @@ public class Node implements Cloneable, Comparable<Node> {
     /**
      * Sets the status to disconnected and flags the node as being disconnected by lack of heartbeat.
      */
-    public void setHeartbeatDisconnection() {
-        setStatus(Status.DISCONNECTED);
+    public Status setHeartbeatDisconnection() {
+        final Status oldStatus = setStatus(Status.DISCONNECTED);
         heartbeatDisconnection = true;
+        return oldStatus;
     }
 
     /**
      * @return the status
      */
-    public Status getStatus() {
+    public synchronized Status getStatus() {
         return status;
     }
 
     /**
+     * Updates the status to the given value, returning the previous status
+     *
      * @param status a status
+     * @return the previous status for the node
      */
-    public void setStatus(final Status status) {
+    public synchronized Status setStatus(final Status status) {
         if (status == null) {
             throw new IllegalArgumentException("Status may not be null.");
         }
+        final Status oldStatus = this.status;
         this.status = status;
         heartbeatDisconnection = false;
+        return oldStatus;
     }
 
     @Override
     public Node clone() {
         final Node clone = new Node(nodeId, status);
-        clone.lastHeartbeat = lastHeartbeat;
-        clone.lastHeartbeatPayload = lastHeartbeatPayload;
         clone.heartbeatDisconnection = heartbeatDisconnection;
         clone.connectionRequestedTimestamp = connectionRequestedTimestamp;
         return clone;

http://git-wip-us.apache.org/repos/asf/nifi/blob/fb7b3fe4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/heartbeat/TestAbstractHeartbeatMonitor.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/heartbeat/TestAbstractHeartbeatMonitor.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/heartbeat/TestAbstractHeartbeatMonitor.java
new file mode 100644
index 0000000..e218e05
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/heartbeat/TestAbstractHeartbeatMonitor.java
@@ -0,0 +1,350 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.cluster.coordination.heartbeat;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+import org.apache.curator.test.TestingServer;
+import org.apache.nifi.cluster.coordination.ClusterCoordinator;
+import org.apache.nifi.cluster.coordination.node.DisconnectionCode;
+import org.apache.nifi.cluster.coordination.node.NodeConnectionState;
+import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus;
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
+import org.apache.nifi.reporting.Severity;
+import org.apache.nifi.util.NiFiProperties;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestAbstractHeartbeatMonitor {
+    private TestingServer zkServer;
+    private NodeIdentifier nodeId;
+    private TestFriendlyHeartbeatMonitor monitor;
+
+    @Before
+    public void setup() throws Exception {
+        zkServer = new TestingServer(true);
+        zkServer.start();
+        nodeId = new NodeIdentifier(UUID.randomUUID().toString(), "localhost", 9999, "localhost", 8888, "localhost", null, false);
+    }
+
+    @After
+    public void clear() throws IOException {
+        if (zkServer != null) {
+            zkServer.stop();
+            zkServer.close();
+        }
+
+        if (monitor != null) {
+            monitor.stop();
+        }
+    }
+
+    /**
+     * Verifies that a node that sends a heartbeat that indicates that it is 'connected' is asked to connect to
+     * cluster if the cluster coordinator does not know about the node
+     *
+     * @throws InterruptedException if interrupted
+     */
+    @Test
+    public void testNewConnectedHeartbeatFromUnknownNode() throws IOException, InterruptedException {
+        final List<NodeIdentifier> requestedToConnect = Collections.synchronizedList(new ArrayList<>());
+        final ClusterCoordinatorAdapter coordinator = new ClusterCoordinatorAdapter() {
+            @Override
+            public synchronized void requestNodeConnect(final NodeIdentifier nodeId) {
+                requestedToConnect.add(nodeId);
+            }
+        };
+
+        final TestFriendlyHeartbeatMonitor monitor = createMonitor(coordinator);
+
+        // Ensure that we request the Unknown Node connect to the cluster
+        final NodeHeartbeat heartbeat = createHeartbeat(nodeId, NodeConnectionState.CONNECTED);
+        monitor.addHeartbeat(heartbeat);
+        monitor.waitForProcessed();
+
+        assertEquals(1, requestedToConnect.size());
+        assertEquals(nodeId, requestedToConnect.get(0));
+        assertEquals(1, coordinator.getEvents().size());
+    }
+
+    /**
+     * Verifies that a node that sends a heartbeat that indicates that it is 'connected' if previously
+     * manually disconnected, will be asked to disconnect from the cluster again.
+     *
+     * @throws InterruptedException if interrupted
+     */
+    @Test
+    public void testHeartbeatFromManuallyDisconnectedNode() throws InterruptedException {
+        final Set<NodeIdentifier> requestedToConnect = Collections.synchronizedSet(new HashSet<>());
+        final Set<NodeIdentifier> requestedToDisconnect = Collections.synchronizedSet(new HashSet<>());
+        final ClusterCoordinatorAdapter adapter = new ClusterCoordinatorAdapter() {
+            @Override
+            public synchronized void requestNodeConnect(final NodeIdentifier nodeId) {
+                super.requestNodeConnect(nodeId);
+                requestedToConnect.add(nodeId);
+            }
+
+            @Override
+            public synchronized void requestNodeDisconnect(final NodeIdentifier nodeId, final DisconnectionCode disconnectionCode, final String explanation) {
+                super.requestNodeDisconnect(nodeId, disconnectionCode, explanation);
+                requestedToDisconnect.add(nodeId);
+            }
+        };
+
+        final TestFriendlyHeartbeatMonitor monitor = createMonitor(adapter);
+
+        adapter.requestNodeDisconnect(nodeId, DisconnectionCode.USER_DISCONNECTED, "Unit Testing");
+        monitor.addHeartbeat(createHeartbeat(nodeId, NodeConnectionState.CONNECTED));
+        monitor.waitForProcessed();
+
+        assertEquals(1, requestedToDisconnect.size());
+        assertEquals(nodeId, requestedToDisconnect.iterator().next());
+        assertTrue(requestedToConnect.isEmpty());
+    }
+
+
+    @Test
+    public void testConnectingNodeMarkedConnectedWhenHeartbeatReceived() throws InterruptedException {
+        final Set<NodeIdentifier> requestedToConnect = Collections.synchronizedSet(new HashSet<>());
+        final Set<NodeIdentifier> connected = Collections.synchronizedSet(new HashSet<>());
+        final ClusterCoordinatorAdapter adapter = new ClusterCoordinatorAdapter() {
+            @Override
+            public synchronized void requestNodeConnect(final NodeIdentifier nodeId) {
+                super.requestNodeConnect(nodeId);
+                requestedToConnect.add(nodeId);
+            }
+
+            @Override
+            public synchronized void finishNodeConnection(final NodeIdentifier nodeId) {
+                super.finishNodeConnection(nodeId);
+                connected.add(nodeId);
+            }
+        };
+
+        final TestFriendlyHeartbeatMonitor monitor = createMonitor(adapter);
+
+        adapter.requestNodeConnect(nodeId); // set state to 'connecting'
+        requestedToConnect.clear();
+
+        monitor.addHeartbeat(createHeartbeat(nodeId, NodeConnectionState.CONNECTED));
+        monitor.waitForProcessed();
+
+        assertEquals(1, connected.size());
+        assertEquals(nodeId, connected.iterator().next());
+        assertTrue(requestedToConnect.isEmpty());
+    }
+
+
+    @Test
+    public void testDisconnectedHeartbeatOnStartup() throws InterruptedException {
+        final Set<NodeIdentifier> requestedToConnect = Collections.synchronizedSet(new HashSet<>());
+        final Set<NodeIdentifier> connected = Collections.synchronizedSet(new HashSet<>());
+        final Set<NodeIdentifier> disconnected = Collections.synchronizedSet(new HashSet<>());
+        final ClusterCoordinatorAdapter adapter = new ClusterCoordinatorAdapter() {
+            @Override
+            public synchronized void requestNodeConnect(final NodeIdentifier nodeId) {
+                super.requestNodeConnect(nodeId);
+                requestedToConnect.add(nodeId);
+            }
+
+            @Override
+            public synchronized void finishNodeConnection(final NodeIdentifier nodeId) {
+                super.finishNodeConnection(nodeId);
+                connected.add(nodeId);
+            }
+
+            @Override
+            public synchronized void requestNodeDisconnect(final NodeIdentifier nodeId, final DisconnectionCode disconnectionCode, final String explanation) {
+                super.requestNodeDisconnect(nodeId, disconnectionCode, explanation);
+                disconnected.add(nodeId);
+            }
+        };
+
+        final TestFriendlyHeartbeatMonitor monitor = createMonitor(adapter);
+
+        requestedToConnect.clear();
+
+        monitor.addHeartbeat(createHeartbeat(nodeId, DisconnectionCode.NODE_SHUTDOWN));
+        monitor.waitForProcessed();
+
+        assertTrue(connected.isEmpty());
+        assertTrue(requestedToConnect.isEmpty());
+        assertTrue(disconnected.isEmpty());
+    }
+
+    private NodeHeartbeat createHeartbeat(final NodeIdentifier nodeId, final DisconnectionCode disconnectionCode) {
+        final NodeConnectionStatus status = new NodeConnectionStatus(disconnectionCode);
+        return new StandardNodeHeartbeat(nodeId, System.currentTimeMillis(), status, false, 0, 0, 0, 0);
+    }
+
+    private NodeHeartbeat createHeartbeat(final NodeIdentifier nodeId, final NodeConnectionState state) {
+        final NodeConnectionStatus status = new NodeConnectionStatus(state);
+        return new StandardNodeHeartbeat(nodeId, System.currentTimeMillis(), status, false, 0, 0, 0, 0);
+    }
+
+    private TestFriendlyHeartbeatMonitor createMonitor(final ClusterCoordinator coordinator) {
+        monitor = new TestFriendlyHeartbeatMonitor(coordinator, createProperties());
+        monitor.start();
+        return monitor;
+    }
+
+    private Properties createProperties() {
+        final Properties properties = new Properties();
+        properties.setProperty(NiFiProperties.CLUSTER_PROTOCOL_HEARTBEAT_INTERVAL, "10 ms");
+        properties.setProperty(NiFiProperties.ZOOKEEPER_CONNECT_STRING, zkServer.getConnectString());
+        properties.setProperty(NiFiProperties.ZOOKEEPER_CONNECT_TIMEOUT, "3 secs");
+        properties.setProperty(NiFiProperties.ZOOKEEPER_SESSION_TIMEOUT, "3 secs");
+        properties.setProperty(NiFiProperties.ZOOKEEPER_ROOT_NODE, "/nifi");
+        return properties;
+    }
+
+    private static class ClusterCoordinatorAdapter implements ClusterCoordinator {
+        private final Map<NodeIdentifier, NodeConnectionStatus> statuses = new HashMap<>();
+        private final List<ReportedEvent> events = new ArrayList<>();
+
+        @Override
+        public synchronized void requestNodeConnect(NodeIdentifier nodeId) {
+            statuses.put(nodeId, new NodeConnectionStatus(NodeConnectionState.CONNECTING));
+        }
+
+        @Override
+        public synchronized void finishNodeConnection(NodeIdentifier nodeId) {
+            statuses.put(nodeId, new NodeConnectionStatus(NodeConnectionState.CONNECTED));
+        }
+
+        @Override
+        public synchronized void requestNodeDisconnect(NodeIdentifier nodeId, DisconnectionCode disconnectionCode, String explanation) {
+            statuses.put(nodeId, new NodeConnectionStatus(NodeConnectionState.DISCONNECTED));
+        }
+
+        @Override
+        public synchronized void disconnectionRequestedByNode(NodeIdentifier nodeId, DisconnectionCode disconnectionCode, String explanation) {
+            statuses.put(nodeId, new NodeConnectionStatus(NodeConnectionState.DISCONNECTED));
+        }
+
+        @Override
+        public synchronized NodeConnectionStatus getConnectionStatus(NodeIdentifier nodeId) {
+            return statuses.get(nodeId);
+        }
+
+        @Override
+        public synchronized Set<NodeIdentifier> getNodeIdentifiers(NodeConnectionState state) {
+            return statuses.entrySet().stream().filter(p -> p.getValue().getState() == state).map(p -> p.getKey()).collect(Collectors.toSet());
+        }
+
+        @Override
+        public synchronized boolean isBlockedByFirewall(String hostname) {
+            return false;
+        }
+
+        @Override
+        public synchronized void reportEvent(NodeIdentifier nodeId, Severity severity, String event) {
+            events.add(new ReportedEvent(nodeId, severity, event));
+        }
+
+        @Override
+        public synchronized void setPrimaryNode(NodeIdentifier nodeId) {
+        }
+
+        synchronized List<ReportedEvent> getEvents() {
+            return new ArrayList<>(events);
+        }
+
+        @Override
+        public NodeIdentifier getNodeIdentifier(final String uuid) {
+            return statuses.keySet().stream().filter(p -> p.getId().equals(uuid)).findFirst().orElse(null);
+        }
+    }
+
+    public static class ReportedEvent {
+        private final NodeIdentifier nodeId;
+        private final Severity severity;
+        private final String event;
+
+        public ReportedEvent(NodeIdentifier nodeId, Severity severity, String event) {
+            this.nodeId = nodeId;
+            this.severity = severity;
+            this.event = event;
+        }
+
+        public NodeIdentifier getNodeId() {
+            return nodeId;
+        }
+
+        public Severity getSeverity() {
+            return severity;
+        }
+
+        public String getEvent() {
+            return event;
+        }
+    }
+
+
+    private static class TestFriendlyHeartbeatMonitor extends AbstractHeartbeatMonitor {
+        private Map<NodeIdentifier, NodeHeartbeat> heartbeats = new HashMap<>();
+        private final Object mutex = new Object();
+
+        public TestFriendlyHeartbeatMonitor(ClusterCoordinator clusterCoordinator, Properties properties) {
+            super(clusterCoordinator, properties);
+        }
+
+        @Override
+        protected synchronized Map<NodeIdentifier, NodeHeartbeat> getLatestHeartbeats() {
+            return heartbeats;
+        }
+
+        @Override
+        public synchronized void monitorHeartbeats() {
+            super.monitorHeartbeats();
+
+            synchronized (mutex) {
+                mutex.notify();
+            }
+        }
+
+        synchronized void addHeartbeat(final NodeHeartbeat heartbeat) {
+            heartbeats.put(heartbeat.getNodeIdentifier(), heartbeat);
+        }
+
+        @Override
+        public synchronized void removeHeartbeat(final NodeIdentifier nodeId) {
+            heartbeats.remove(nodeId);
+        }
+
+        void waitForProcessed() throws InterruptedException {
+            synchronized (mutex) {
+                mutex.wait();
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/fb7b3fe4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/cluster/ConnectionException.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/cluster/ConnectionException.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/cluster/ConnectionException.java
index 1d42729..86057ff 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/cluster/ConnectionException.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/cluster/ConnectionException.java
@@ -24,6 +24,10 @@ public class ConnectionException extends RuntimeException {
 
     private static final long serialVersionUID = -1378294897231234028L;
 
+    public ConnectionException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
+        super(message, cause, enableSuppression, writableStackTrace);
+    }
+
     public ConnectionException() {
     }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/fb7b3fe4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
index 75395b7..13a01bb 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
@@ -38,6 +38,7 @@ import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
@@ -61,6 +62,9 @@ import org.apache.nifi.annotation.lifecycle.OnShutdown;
 import org.apache.nifi.annotation.notification.OnPrimaryNodeStateChange;
 import org.apache.nifi.annotation.notification.PrimaryNodeState;
 import org.apache.nifi.cluster.HeartbeatPayload;
+import org.apache.nifi.cluster.coordination.node.DisconnectionCode;
+import org.apache.nifi.cluster.coordination.node.NodeConnectionState;
+import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus;
 import org.apache.nifi.cluster.protocol.DataFlow;
 import org.apache.nifi.cluster.protocol.Heartbeat;
 import org.apache.nifi.cluster.protocol.NodeIdentifier;
@@ -78,6 +82,8 @@ import org.apache.nifi.connectable.Port;
 import org.apache.nifi.connectable.Position;
 import org.apache.nifi.connectable.Size;
 import org.apache.nifi.connectable.StandardConnection;
+import org.apache.nifi.controller.cluster.Heartbeater;
+import org.apache.nifi.controller.cluster.ZooKeeperHeartbeater;
 import org.apache.nifi.controller.exception.CommunicationsException;
 import org.apache.nifi.controller.exception.ComponentLifeCycleException;
 import org.apache.nifi.controller.exception.ProcessorInstantiationException;
@@ -299,10 +305,6 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
      */
     private final StringEncryptor encryptor;
 
-    /**
-     * cluster protocol sender
-     */
-    private final NodeProtocolSender protocolSender;
 
     private final ScheduledExecutorService clusterTaskExecutor = new FlowEngine(3, "Clustering Tasks");
     private final ResourceClaimManager resourceClaimManager = new StandardResourceClaimManager();
@@ -311,14 +313,14 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
     /**
      * timer to periodically send heartbeats to the cluster
      */
-    private ScheduledFuture<?> heartbeatGeneratorFuture;
     private ScheduledFuture<?> heartbeatSenderFuture;
+    private final Heartbeater heartbeater;
 
     // guarded by FlowController lock
     /**
      * timer task to generate heartbeats
      */
-    private final AtomicReference<HeartbeatMessageGeneratorTask> heartbeatMessageGeneratorTaskRef = new AtomicReference<>(null);
+    private final AtomicReference<HeartbeatSendTask> heartbeatSendTask = new AtomicReference<>(null);
 
     // guarded by rwLock
     /**
@@ -334,10 +336,8 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
     private String clusterManagerDN;
 
     // guarded by rwLock
-    /**
-     * true if connected to a cluster
-     */
-    private boolean connected;
+    private NodeConnectionStatus connectionStatus;
+    private final ConcurrentMap<NodeIdentifier, VersionedNodeConnectionStatus> nodeStatuses = new ConcurrentHashMap<>();
 
     // guarded by rwLock
     private String instanceId;
@@ -471,7 +471,6 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
 
         this.configuredForClustering = configuredForClustering;
         this.heartbeatDelaySeconds = (int) FormatUtils.getTimeDuration(properties.getNodeHeartbeatInterval(), TimeUnit.SECONDS);
-        this.protocolSender = protocolSender;
         try {
             this.templateManager = new TemplateManager(properties.getTemplateDirectory());
         } catch (final IOException e) {
@@ -508,9 +507,10 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
         }
 
         // Initialize the Embedded ZooKeeper server, if applicable
-        if (properties.isStartEmbeddedZooKeeper()) {
+        if (properties.isStartEmbeddedZooKeeper() && configuredForClustering) {
             try {
                 zooKeeperStateServer = ZooKeeperStateServer.create(properties);
+                zooKeeperStateServer.start();
             } catch (final IOException | ConfigException e) {
                 throw new IllegalStateException("Unable to initailize Flow because NiFi was configured to start an Embedded Zookeeper server but failed to do so", e);
             }
@@ -526,11 +526,14 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
             }
         }, snapshotMillis, snapshotMillis, TimeUnit.MILLISECONDS);
 
-        heartbeatBeanRef.set(new HeartbeatBean(rootGroup, false, false));
+        heartbeatBeanRef.set(new HeartbeatBean(rootGroup, false, new NodeConnectionStatus(NodeConnectionState.DISCONNECTED, DisconnectionCode.NOT_YET_CONNECTED)));
+
         if (configuredForClustering) {
             leaderElectionManager = new CuratorLeaderElectionManager(4);
+            heartbeater = new ZooKeeperHeartbeater(protocolSender, properties);
         } else {
             leaderElectionManager = null;
+            heartbeater = null;
         }
     }
 
@@ -1003,6 +1006,10 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
         }
     }
 
+    public Heartbeater getHeartbeater() {
+        return heartbeater;
+    }
+
     /**
      * @return the BulletinRepository for storing and retrieving Bulletins
      */
@@ -1164,6 +1171,10 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
                 throw new IllegalStateException("Controller already stopped or still stopping...");
             }
 
+            if (heartbeater != null) {
+                sendShutdownNotification();
+            }
+
             if (leaderElectionManager != null) {
                 leaderElectionManager.stop();
             }
@@ -1253,6 +1264,39 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
     }
 
     /**
+     * Sends a notification to the cluster that the node was shut down.
+     */
+    private void sendShutdownNotification() {
+        // Generate a heartbeat message and publish it, indicating that we are shutting down
+        final HeartbeatMessage heartbeatMsg = createHeartbeatMessage();
+        final Heartbeat heartbeat = heartbeatMsg.getHeartbeat();
+        final byte[] hbPayload = heartbeatMsg.getHeartbeat().getPayload();
+        final NodeConnectionStatus connectionStatus = new NodeConnectionStatus(DisconnectionCode.NODE_SHUTDOWN);
+        heartbeatMsg.setHeartbeat(new Heartbeat(heartbeat.getNodeIdentifier(), false, connectionStatus, hbPayload));
+        final Runnable sendNotification = new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    heartbeater.send(heartbeatMsg);
+                } catch (IOException e) {
+                    LOG.warn("Failed to send NODE_SHUTDOWN heartbeat message. Cluster may not be notified of "
+                        + "shutdown and may have to wait for the heartbeats to time out before noticing that the node left the cluster");
+                }
+            }
+        };
+
+        final Future<?> hbFuture = processScheduler.submitFrameworkTask(sendNotification);
+        try {
+            hbFuture.get(3, TimeUnit.SECONDS);
+            LOG.info("Successfully sent Shutdown Notification to cluster");
+        } catch (final Exception e) {
+            LOG.warn("Failed to send NODE_SHUTDOWN heartbeat message in time. Cluster may not be notified of "
+                + "shutdown and may have to wait for the heartbeats to time out before noticing that the node left the cluster");
+        }
+    }
+
+
+    /**
      * Serializes the current state of the controller to the given OutputStream
      *
      * @param serializer serializer
@@ -1374,7 +1418,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
             }
 
             // update the heartbeat bean
-            this.heartbeatBeanRef.set(new HeartbeatBean(rootGroup, isPrimary(), connected));
+            this.heartbeatBeanRef.set(new HeartbeatBean(rootGroup, isPrimary(), connectionStatus));
         } finally {
             writeLock.unlock();
         }
@@ -2951,14 +2995,11 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
 
         writeLock.lock();
         try {
-
             stopHeartbeating();
 
-            final HeartbeatMessageGeneratorTask heartbeatMessageGeneratorTask = new HeartbeatMessageGeneratorTask();
-            heartbeatMessageGeneratorTaskRef.set(heartbeatMessageGeneratorTask);
-            heartbeatGeneratorFuture = clusterTaskExecutor.scheduleWithFixedDelay(heartbeatMessageGeneratorTask, 0, heartbeatDelaySeconds, TimeUnit.SECONDS);
-
-            heartbeatSenderFuture = clusterTaskExecutor.scheduleWithFixedDelay(new HeartbeatSendTask(protocolSender), 250, 250, TimeUnit.MILLISECONDS);
+            final HeartbeatSendTask sendTask = new HeartbeatSendTask();
+            this.heartbeatSendTask.set(sendTask);
+            heartbeatSenderFuture = clusterTaskExecutor.scheduleWithFixedDelay(sendTask, 0, heartbeatDelaySeconds, TimeUnit.SECONDS);
         } finally {
             writeLock.unlock();
         }
@@ -2986,7 +3027,6 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
      * @throws IllegalStateException if not clustered
      */
     public void stopHeartbeating() throws IllegalStateException {
-
         if (!configuredForClustering) {
             throw new IllegalStateException("Unable to stop heartbeating because heartbeating is not configured.");
         }
@@ -2997,10 +3037,6 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
                 return;
             }
 
-            if (heartbeatGeneratorFuture != null) {
-                heartbeatGeneratorFuture.cancel(false);
-            }
-
             if (heartbeatSenderFuture != null) {
                 heartbeatSenderFuture.cancel(false);
             }
@@ -3016,8 +3052,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
     public boolean isHeartbeating() {
         readLock.lock();
         try {
-            return heartbeatGeneratorFuture != null && !heartbeatGeneratorFuture.isCancelled()
-                && heartbeatSenderFuture != null && !heartbeatSenderFuture.isCancelled();
+            return heartbeatSenderFuture != null && !heartbeatSenderFuture.isCancelled();
         } finally {
             readLock.unlock();
         }
@@ -3112,6 +3147,12 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
             boolean isChanging = false;
             if (this.clustered != clustered) {
                 isChanging = true;
+
+                if (clustered) {
+                    LOG.info("Cluster State changed from Not Clustered to Clustered");
+                } else {
+                    LOG.info("Cluster State changed from Clustered to Not Clustered");
+                }
             }
 
             // mark the new cluster status
@@ -3185,6 +3226,8 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
                         zooKeeperStateServer.shutdown();
                     }
                     stateManagerProvider.disableClusterProvider();
+
+                    setPrimary(false);
                 }
 
                 final List<RemoteProcessGroup> remoteGroups = getGroup(getRootGroupId()).findAllRemoteProcessGroups();
@@ -3194,7 +3237,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
             }
 
             // update the heartbeat bean
-            this.heartbeatBeanRef.set(new HeartbeatBean(rootGroup, isPrimary(), connected));
+            this.heartbeatBeanRef.set(new HeartbeatBean(rootGroup, isPrimary(), connectionStatus));
         } finally {
             writeLock.unlock();
         }
@@ -3230,12 +3273,15 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
         eventDrivenWorkerQueue.setPrimary(primary);
 
         // update the heartbeat bean
-        this.heartbeatBeanRef.set(new HeartbeatBean(rootGroup, primary, connected));
+        final HeartbeatBean oldBean = this.heartbeatBeanRef.getAndSet(new HeartbeatBean(rootGroup, primary, connectionStatus));
 
         // Emit a bulletin detailing the fact that the primary node state has changed
-        final String message = primary ? "This node has been elected Primary Node" : "This node is no longer Primary Node";
-        final Bulletin bulletin = BulletinFactory.createBulletin("Primary Node", Severity.INFO.name(), message);
-        bulletinRepository.addBulletin(bulletin);
+        if (oldBean == null || oldBean.isPrimary() != primary) {
+            final String message = primary ? "This node has been elected Primary Node" : "This node is no longer Primary Node";
+            final Bulletin bulletin = BulletinFactory.createBulletin("Primary Node", Severity.INFO.name(), message);
+            bulletinRepository.addBulletin(bulletin);
+            LOG.info(message);
+        }
     }
 
     static boolean areEqual(final String a, final String b) {
@@ -3602,19 +3648,19 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
     public boolean isConnected() {
         rwLock.readLock().lock();
         try {
-            return connected;
+            return connectionStatus.getState() == NodeConnectionState.CONNECTED;
         } finally {
             rwLock.readLock().unlock();
         }
     }
 
-    public void setConnected(final boolean connected) {
+    public void setConnectionStatus(final NodeConnectionStatus connectionStatus) {
         rwLock.writeLock().lock();
         try {
-            this.connected = connected;
+            this.connectionStatus = connectionStatus;
 
             // update the heartbeat bean
-            this.heartbeatBeanRef.set(new HeartbeatBean(rootGroup, isPrimary(), connected));
+            this.heartbeatBeanRef.set(new HeartbeatBean(rootGroup, isPrimary(), connectionStatus));
         } finally {
             rwLock.writeLock().unlock();
         }
@@ -3628,25 +3674,16 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
             return;
         }
 
-        final HeartbeatMessageGeneratorTask task = heartbeatMessageGeneratorTaskRef.get();
+        final HeartbeatSendTask task = heartbeatSendTask.get();
         if (task != null) {
-            task.run();
+            clusterTaskExecutor.submit(task);
         }
     }
 
 
     private class HeartbeatSendTask implements Runnable {
-
-        private final NodeProtocolSender protocolSender;
         private final DateFormat dateFormatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss,SSS", Locale.US);
 
-        public HeartbeatSendTask(final NodeProtocolSender protocolSender) {
-            if (protocolSender == null) {
-                throw new IllegalArgumentException("NodeProtocolSender may not be null.");
-            }
-            this.protocolSender = protocolSender;
-        }
-
         @Override
         public void run() {
             try {
@@ -3654,19 +3691,15 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
                     return;
                 }
 
-                final HeartbeatMessageGeneratorTask task = heartbeatMessageGeneratorTaskRef.get();
-                if (task == null) {
-                    return;
-                }
-
-                final HeartbeatMessage message = task.getHeartbeatMessage();
+                final HeartbeatMessage message = createHeartbeatMessage();
                 if (message == null) {
                     heartbeatLogger.debug("No heartbeat to send");
                     return;
                 }
 
                 final long sendStart = System.nanoTime();
-                protocolSender.heartbeat(message);
+                heartbeater.send(message);
+
                 final long sendNanos = System.nanoTime() - sendStart;
                 final long sendMillis = TimeUnit.NANOSECONDS.toMillis(sendNanos);
 
@@ -3679,58 +3712,44 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
                     heartbeatLogger.debug(usae.getMessage());
                 }
             } catch (final Throwable ex) {
-                heartbeatLogger.warn("Failed to send heartbeat to cluster manager due to: " + ex);
-                if (heartbeatLogger.isDebugEnabled()) {
-                    heartbeatLogger.warn("", ex);
-                }
+                heartbeatLogger.warn("Failed to send heartbeat due to: " + ex, ex);
             }
         }
     }
 
-    private class HeartbeatMessageGeneratorTask implements Runnable {
-
-        private final AtomicReference<HeartbeatMessage> heartbeatMessageRef = new AtomicReference<>();
-
-        @Override
-        public void run() {
-            final HeartbeatMessage heartbeatMessage = createHeartbeatMessage();
-            if (heartbeatMessage != null) {
-                heartbeatMessageRef.set(heartbeatMessage);
-            }
-        }
-
-        public HeartbeatMessage getHeartbeatMessage() {
-            return heartbeatMessageRef.getAndSet(null);
-        }
-
-        private HeartbeatMessage createHeartbeatMessage() {
-            try {
-                final HeartbeatBean bean = heartbeatBeanRef.get();
-                if (bean == null) {
-                    return null;
+    HeartbeatMessage createHeartbeatMessage() {
+        try {
+            HeartbeatBean bean = heartbeatBeanRef.get();
+            if (bean == null) {
+                readLock.lock();
+                try {
+                    final NodeConnectionStatus connectionStatus = new NodeConnectionStatus(DisconnectionCode.NOT_YET_CONNECTED);
+                    bean = new HeartbeatBean(getGroup(getRootGroupId()), isPrimary(), connectionStatus);
+                } finally {
+                    readLock.unlock();
                 }
+            }
 
-                // create heartbeat payload
-                final HeartbeatPayload hbPayload = new HeartbeatPayload();
-                hbPayload.setSystemStartTime(systemStartTime);
-                hbPayload.setActiveThreadCount(getActiveThreadCount());
+            // create heartbeat payload
+            final HeartbeatPayload hbPayload = new HeartbeatPayload();
+            hbPayload.setSystemStartTime(systemStartTime);
+            hbPayload.setActiveThreadCount(getActiveThreadCount());
 
-                final QueueSize queueSize = getTotalFlowFileCount(bean.getRootGroup());
-                hbPayload.setTotalFlowFileCount(queueSize.getObjectCount());
-                hbPayload.setTotalFlowFileBytes(queueSize.getByteCount());
+            final QueueSize queueSize = getTotalFlowFileCount(bean.getRootGroup());
+            hbPayload.setTotalFlowFileCount(queueSize.getObjectCount());
+            hbPayload.setTotalFlowFileBytes(queueSize.getByteCount());
 
-                // create heartbeat message
-                final Heartbeat heartbeat = new Heartbeat(getNodeId(), bean.isPrimary(), bean.isConnected(), hbPayload.marshal());
-                final HeartbeatMessage message = new HeartbeatMessage();
-                message.setHeartbeat(heartbeat);
+            // create heartbeat message
+            final Heartbeat heartbeat = new Heartbeat(getNodeId(), bean.isPrimary(), bean.getConnectionStatus(), hbPayload.marshal());
+            final HeartbeatMessage message = new HeartbeatMessage();
+            message.setHeartbeat(heartbeat);
 
-                heartbeatLogger.debug("Generated heartbeat");
+            heartbeatLogger.debug("Generated heartbeat");
 
-                return message;
-            } catch (final Throwable ex) {
-                LOG.warn("Failed to create heartbeat due to: " + ex, ex);
-                return null;
-            }
+            return message;
+        } catch (final Throwable ex) {
+            LOG.warn("Failed to create heartbeat due to: " + ex, ex);
+            return null;
         }
     }
 
@@ -3852,16 +3871,51 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
         return queues;
     }
 
+    public void setNodeStatus(final NodeIdentifier nodeId, final NodeConnectionStatus nodeStatus, final Long updateId) {
+        // We keep a VersionedNodeConnectionStatus as the value in our map, rather than NodeConnectionStatus.
+        // We do this because we update this based on data that is coming from the network. It is possible that we will
+        // get these notifications out-of-order because they could be sent across different TCP connections. As a result,
+        // we need to ensure that we don't update the status to an older version. The VersionedNodeConnectionStatus
+        // allows us to do this by looking at an "Update ID" that is associated with the new node status.
+        final VersionedNodeConnectionStatus versionedStatus = new VersionedNodeConnectionStatus(nodeStatus, updateId);
+
+        boolean updated = false;
+        while (!updated) {
+            VersionedNodeConnectionStatus curStatus = nodeStatuses.putIfAbsent(nodeId, versionedStatus);
+            if (curStatus == null) {
+                // There was no status before.
+                LOG.info("Status of Node {} set to {}", nodeId, nodeStatus);
+                return;
+            }
+
+            if (updateId < curStatus.getUpdateId()) {
+                LOG.debug("Received notification that status of Node {} changed to {} but the status update was old. Ignoring update.", nodeId, nodeStatus);
+                return;
+            }
+
+            updated = nodeStatuses.replace(nodeId, curStatus, versionedStatus);
+            if (updated) {
+                LOG.info("Status of {} changed from {} to {}", nodeId, curStatus.getStatus(), nodeStatus);
+                return;
+            }
+        }
+    }
+
+    public NodeConnectionStatus getNodeStatus(final NodeIdentifier nodeId) {
+        final VersionedNodeConnectionStatus versionedStatus = nodeStatuses.get(nodeId);
+        return versionedStatus == null ? null : versionedStatus.getStatus();
+    }
+
     private static class HeartbeatBean {
 
         private final ProcessGroup rootGroup;
         private final boolean primary;
-        private final boolean connected;
+        private final NodeConnectionStatus connectionStatus;
 
-        public HeartbeatBean(final ProcessGroup rootGroup, final boolean primary, final boolean connected) {
+        public HeartbeatBean(final ProcessGroup rootGroup, final boolean primary, final NodeConnectionStatus connectionStatus) {
             this.rootGroup = rootGroup;
             this.primary = primary;
-            this.connected = connected;
+            this.connectionStatus = connectionStatus;
         }
 
         public ProcessGroup getRootGroup() {
@@ -3872,9 +3926,56 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
             return primary;
         }
 
-        public boolean isConnected() {
-            return connected;
+        public NodeConnectionStatus getConnectionStatus() {
+            return connectionStatus;
         }
     }
 
+
+    /**
+     * A simple wrapper around a Node Connection Status and an Update ID. This is used as a value in a map so that we
+     * ensure that we update that Map only with newer versions
+     */
+    private static class VersionedNodeConnectionStatus {
+        private final NodeConnectionStatus status;
+        private final Long updateId;
+
+        public VersionedNodeConnectionStatus(final NodeConnectionStatus status, final Long updateId) {
+            this.status = status;
+            this.updateId = updateId;
+        }
+
+        public NodeConnectionStatus getStatus() {
+            return status;
+        }
+
+        public Long getUpdateId() {
+            return updateId;
+        }
+
+        @Override
+        public int hashCode() {
+            final int prime = 31;
+            int result = 1;
+            result = prime * result + ((status == null) ? 0 : status.hashCode());
+            result = prime * result + ((updateId == null) ? 0 : updateId.hashCode());
+            return result;
+        }
+
+        @Override
+        public boolean equals(Object obj) {
+            if (this == obj) {
+                return true;
+            }
+            if (obj == null) {
+                return false;
+            }
+            if (getClass() != obj.getClass()) {
+                return false;
+            }
+
+            VersionedNodeConnectionStatus other = (VersionedNodeConnectionStatus) obj;
+            return other.getStatus().equals(getStatus()) && other.getUpdateId().equals(getUpdateId());
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/fb7b3fe4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowSynchronizationException.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowSynchronizationException.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowSynchronizationException.java
index 00c6c5d..c648664 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowSynchronizationException.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowSynchronizationException.java
@@ -16,11 +16,13 @@
  */
 package org.apache.nifi.controller;
 
+import org.apache.nifi.cluster.ConnectionException;
+
 /**
  * Represents the exceptional case when a controller managing an existing flow fails to fully load a different flow.
  *
  */
-public class FlowSynchronizationException extends RuntimeException {
+public class FlowSynchronizationException extends ConnectionException {
 
     private static final long serialVersionUID = 109234802938L;
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/fb7b3fe4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java
index 67d0338..45999a3 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java
@@ -41,26 +41,31 @@ import java.util.zip.GZIPOutputStream;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.cluster.ConnectionException;
+import org.apache.nifi.cluster.HeartbeatPayload;
+import org.apache.nifi.cluster.coordination.node.DisconnectionCode;
+import org.apache.nifi.cluster.coordination.node.NodeConnectionState;
+import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus;
 import org.apache.nifi.cluster.protocol.ConnectionRequest;
 import org.apache.nifi.cluster.protocol.ConnectionResponse;
 import org.apache.nifi.cluster.protocol.DataFlow;
+import org.apache.nifi.cluster.protocol.Heartbeat;
 import org.apache.nifi.cluster.protocol.NodeIdentifier;
 import org.apache.nifi.cluster.protocol.ProtocolException;
 import org.apache.nifi.cluster.protocol.ProtocolHandler;
 import org.apache.nifi.cluster.protocol.StandardDataFlow;
-import org.apache.nifi.cluster.protocol.UnknownServiceAddressException;
 import org.apache.nifi.cluster.protocol.impl.NodeProtocolSenderListener;
 import org.apache.nifi.cluster.protocol.message.ConnectionRequestMessage;
-import org.apache.nifi.cluster.protocol.message.ControllerStartupFailureMessage;
 import org.apache.nifi.cluster.protocol.message.DisconnectMessage;
 import org.apache.nifi.cluster.protocol.message.FlowRequestMessage;
 import org.apache.nifi.cluster.protocol.message.FlowResponseMessage;
+import org.apache.nifi.cluster.protocol.message.HeartbeatMessage;
+import org.apache.nifi.cluster.protocol.message.NodeStatusChangeMessage;
 import org.apache.nifi.cluster.protocol.message.ProtocolMessage;
-import org.apache.nifi.cluster.protocol.message.ReconnectionFailureMessage;
 import org.apache.nifi.cluster.protocol.message.ReconnectionRequestMessage;
 import org.apache.nifi.cluster.protocol.message.ReconnectionResponseMessage;
 import org.apache.nifi.components.state.Scope;
 import org.apache.nifi.components.state.StateManager;
+import org.apache.nifi.controller.cluster.Heartbeater;
 import org.apache.nifi.encrypt.StringEncryptor;
 import org.apache.nifi.engine.FlowEngine;
 import org.apache.nifi.events.BulletinFactory;
@@ -341,6 +346,7 @@ public class StandardFlowService implements FlowService, ProtocolHandler {
             case RECONNECTION_REQUEST:
             case DISCONNECTION_REQUEST:
             case FLOW_REQUEST:
+            case NODE_STATUS_CHANGE:
                 return true;
             default:
                 return false;
@@ -354,6 +360,10 @@ public class StandardFlowService implements FlowService, ProtocolHandler {
             switch (request.getType()) {
                 case FLOW_REQUEST:
                     return handleFlowRequest((FlowRequestMessage) request);
+                case NODE_STATUS_CHANGE:
+                    final NodeStatusChangeMessage statusChangeMsg = (NodeStatusChangeMessage) request;
+                    controller.setNodeStatus(statusChangeMsg.getNodeId(), statusChangeMsg.getNodeConnectionStatus(), statusChangeMsg.getStatusUpdateIdentifier());
+                    return null;
                 case RECONNECTION_REQUEST:
                     // Suspend heartbeats until we've reconnected. Otherwise,
                     // we may send a heartbeat while we are still in the process of
@@ -419,7 +429,7 @@ public class StandardFlowService implements FlowService, ProtocolHandler {
                     // set node as clustered, since it is trying to connect to a cluster
                     controller.setClustered(true, null);
                     controller.setClusterManagerRemoteSiteInfo(null, null);
-                    controller.setConnected(false);
+                    controller.setConnectionStatus(new NodeConnectionStatus(DisconnectionCode.NOT_YET_CONNECTED));
 
                     /*
                      * Start heartbeating.  Heartbeats will fail because we can't reach
@@ -444,23 +454,7 @@ public class StandardFlowService implements FlowService, ProtocolHandler {
                         loadFromConnectionResponse(response);
                     } catch (final ConnectionException ce) {
                         logger.error("Failed to load flow from cluster due to: " + ce, ce);
-
-                        /*
-                         * If we failed processing the response, then we want to notify
-                         * the manager so that it can mark the node as disconnected.
-                         */
-                        // create error message
-                        final ControllerStartupFailureMessage msg = new ControllerStartupFailureMessage();
-                        msg.setExceptionMessage(ce.getMessage());
-                        msg.setNodeId(response.getNodeIdentifier());
-
-                        // send error message to manager
-                        try {
-                            senderListener.notifyControllerStartupFailure(msg);
-                        } catch (final ProtocolException | UnknownServiceAddressException e) {
-                            logger.warn("Failed to notify cluster manager of controller startup failure due to: " + e, e);
-                        }
-
+                        handleConnectionFailure(ce);
                         throw new IOException(ce);
                     }
                 }
@@ -474,6 +468,33 @@ public class StandardFlowService implements FlowService, ProtocolHandler {
         }
     }
 
+    private void handleConnectionFailure(final Exception ex) {
+        final Heartbeater heartbeater = controller.getHeartbeater();
+        if (heartbeater != null) {
+            final HeartbeatMessage startupFailureMessage = new HeartbeatMessage();
+            final NodeConnectionStatus connectionStatus;
+            if (ex instanceof UninheritableFlowException) {
+                connectionStatus = new NodeConnectionStatus(DisconnectionCode.MISMATCHED_FLOWS, ex.toString());
+            } else if (ex instanceof FlowSynchronizationException) {
+                connectionStatus = new NodeConnectionStatus(DisconnectionCode.MISMATCHED_FLOWS, ex.toString());
+            } else {
+                connectionStatus = new NodeConnectionStatus(DisconnectionCode.STARTUP_FAILURE, ex.toString());
+            }
+
+            final byte[] payload;
+            try (final ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
+                HeartbeatPayload.marshal(new HeartbeatPayload(), baos);
+                payload = baos.toByteArray();
+
+                final Heartbeat failureHeartbeat = new Heartbeat(nodeId, false, connectionStatus, payload);
+                startupFailureMessage.setHeartbeat(failureHeartbeat);
+                heartbeater.send(startupFailureMessage);
+            } catch (final Exception e) {
+                logger.error("Failed to notify Cluster Coordinator that Connection failed", e);
+            }
+        }
+    }
+
     private FlowResponseMessage handleFlowRequest(final FlowRequestMessage request) throws ProtocolException {
         readLock.lock();
         try {
@@ -509,7 +530,7 @@ public class StandardFlowService implements FlowService, ProtocolHandler {
             logger.info("Processing reconnection request from manager.");
 
             // reconnect
-            final ConnectionResponse connectionResponse = new ConnectionResponse(nodeId, request.getDataFlow(), request.isPrimary(),
+            final ConnectionResponse connectionResponse = new ConnectionResponse(nodeId, request.getDataFlow(),
                 request.getManagerRemoteSiteListeningPort(), request.isManagerRemoteSiteCommsSecure(), request.getInstanceId());
             connectionResponse.setClusterManagerDN(request.getRequestorDN());
             loadFromConnectionResponse(connectionResponse);
@@ -520,21 +541,11 @@ public class StandardFlowService implements FlowService, ProtocolHandler {
         } catch (final Exception ex) {
             // disconnect controller
             if (controller.isClustered()) {
-                disconnect();
+                disconnect("Failed to properly handle Reconnection request due to " + ex.toString());
             }
 
             logger.error("Handling reconnection request failed due to: " + ex, ex);
-
-            final ReconnectionFailureMessage failureMessage = new ReconnectionFailureMessage();
-            failureMessage.setNodeId(request.getNodeId());
-            failureMessage.setExceptionMessage(ex.toString());
-
-            // send error message to manager
-            try {
-                senderListener.notifyReconnectionFailure(failureMessage);
-            } catch (final ProtocolException | UnknownServiceAddressException e) {
-                logger.warn("Failed to notify cluster manager of controller reconnection failure due to: " + e, e);
-            }
+            handleConnectionFailure(ex);
         } finally {
             writeLock.unlock();
         }
@@ -544,20 +555,20 @@ public class StandardFlowService implements FlowService, ProtocolHandler {
         writeLock.lock();
         try {
             logger.info("Received disconnection request message from manager with explanation: " + request.getExplanation());
-            disconnect();
+            disconnect(request.getExplanation());
         } finally {
             writeLock.unlock();
         }
     }
 
-    private void disconnect() {
+    private void disconnect(final String explanation) {
         writeLock.lock();
         try {
 
             logger.info("Disconnecting node.");
 
             // mark node as not connected
-            controller.setConnected(false);
+            controller.setConnectionStatus(new NodeConnectionStatus(DisconnectionCode.UNKNOWN, explanation));
 
             // turn off primary flag
             controller.setPrimary(false);
@@ -727,7 +738,7 @@ public class StandardFlowService implements FlowService, ProtocolHandler {
             controller.setClustered(true, response.getInstanceId(), response.getClusterManagerDN());
             controller.setClusterManagerRemoteSiteInfo(response.getManagerRemoteInputPort(), response.isManagerRemoteCommsSecure());
 
-            controller.setConnected(true);
+            controller.setConnectionStatus(new NodeConnectionStatus(NodeConnectionState.CONNECTED));
 
             // start the processors as indicated by the dataflow
             controller.onFlowInitialized(dataFlow.isAutoStartProcessors());
@@ -736,11 +747,12 @@ public class StandardFlowService implements FlowService, ProtocolHandler {
             loadSnippets(dataFlow.getSnippets());
             controller.startHeartbeating();
         } catch (final UninheritableFlowException ufe) {
-            throw new ConnectionException(CONNECTION_EXCEPTION_MSG_PREFIX + "local flow is different than cluster flow.", ufe);
+            throw new UninheritableFlowException(CONNECTION_EXCEPTION_MSG_PREFIX + "local flow is different than cluster flow.", ufe);
         } catch (final FlowSerializationException fse) {
             throw new ConnectionException(CONNECTION_EXCEPTION_MSG_PREFIX + "local or cluster flow is malformed.", fse);
         } catch (final FlowSynchronizationException fse) {
-            throw new ConnectionException(CONNECTION_EXCEPTION_MSG_PREFIX + "local flow controller partially updated.  Administrator should disconnect node and review flow for corruption.", fse);
+            throw new FlowSynchronizationException(CONNECTION_EXCEPTION_MSG_PREFIX + "local flow controller partially updated. "
+                + "Administrator should disconnect node and review flow for corruption.", fse);
         } catch (final Exception ex) {
             throw new ConnectionException("Failed to connect node to cluster due to: " + ex, ex);
         } finally {

http://git-wip-us.apache.org/repos/asf/nifi/blob/fb7b3fe4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/UninheritableFlowException.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/UninheritableFlowException.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/UninheritableFlowException.java
index 1b4f9d9..d7fd6d8 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/UninheritableFlowException.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/UninheritableFlowException.java
@@ -16,11 +16,13 @@
  */
 package org.apache.nifi.controller;
 
+import org.apache.nifi.cluster.ConnectionException;
+
 /**
  * Represents the exceptional case when a controller is to be loaded with a flow that is fundamentally different than its existing flow.
  *
  */
-public class UninheritableFlowException extends RuntimeException {
+public class UninheritableFlowException extends ConnectionException {
 
     private static final long serialVersionUID = 198234798234794L;
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/fb7b3fe4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/cluster/Heartbeater.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/cluster/Heartbeater.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/cluster/Heartbeater.java
new file mode 100644
index 0000000..e5379d8
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/cluster/Heartbeater.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.cluster;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+import org.apache.nifi.cluster.protocol.message.HeartbeatMessage;
+
+/**
+ * <p>
+ * A mechanism for sending a heartbeat to a remote resource to indicate
+ * that the node is still an active participant in the cluster
+ * <p>
+ */
+public interface Heartbeater extends Closeable {
+
+    /**
+     * Sends the given heartbeat to the remote resource
+     *
+     * @param heartbeat the Heartbeat to send
+     * @throws IOException if unable to communicate with the remote resource
+     */
+    void send(HeartbeatMessage heartbeat) throws IOException;
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/fb7b3fe4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/cluster/ZooKeeperClientConfig.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/cluster/ZooKeeperClientConfig.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/cluster/ZooKeeperClientConfig.java
new file mode 100644
index 0000000..8571f51
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/cluster/ZooKeeperClientConfig.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.cluster;
+
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.nifi.util.FormatUtils;
+import org.apache.nifi.util.NiFiProperties;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.common.PathUtils;
+import org.apache.zookeeper.data.ACL;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ZooKeeperClientConfig {
+    private static final Logger logger = LoggerFactory.getLogger(ZooKeeperClientConfig.class);
+
+    private final String connectString;
+    private final int sessionTimeoutMillis;
+    private final int connectionTimeoutMillis;
+    private final String rootPath;
+    private final List<ACL> acls;
+
+    private ZooKeeperClientConfig(String connectString, int sessionTimeoutMillis, int connectionTimeoutMillis, String rootPath, List<ACL> acls) {
+        this.connectString = connectString;
+        this.sessionTimeoutMillis = sessionTimeoutMillis;
+        this.connectionTimeoutMillis = connectionTimeoutMillis;
+        this.rootPath = rootPath.endsWith("/") ? rootPath.substring(0, rootPath.length() - 1) : rootPath;
+        this.acls = acls;
+    }
+
+    public String getConnectString() {
+        return connectString;
+    }
+
+    public int getSessionTimeoutMillis() {
+        return sessionTimeoutMillis;
+    }
+
+    public int getConnectionTimeoutMillis() {
+        return connectionTimeoutMillis;
+    }
+
+    public String getRootPath() {
+        return rootPath;
+    }
+
+    public List<ACL> getACLs() {
+        return acls;
+    }
+
+    public String resolvePath(final String path) {
+        if (path.startsWith("/")) {
+            return rootPath + path;
+        }
+
+        return rootPath + "/" + path;
+    }
+
+    public static ZooKeeperClientConfig createConfig(final Properties properties) {
+        final String connectString = properties.getProperty(NiFiProperties.ZOOKEEPER_CONNECT_STRING);
+        if (connectString == null || connectString.trim().isEmpty()) {
+            throw new IllegalStateException("The '" + NiFiProperties.ZOOKEEPER_CONNECT_STRING + "' property is not set in nifi.properties");
+        }
+
+        final long sessionTimeoutMs = getTimePeriod(properties, NiFiProperties.ZOOKEEPER_SESSION_TIMEOUT, NiFiProperties.DEFAULT_ZOOKEEPER_SESSION_TIMEOUT);
+        final long connectionTimeoutMs = getTimePeriod(properties, NiFiProperties.ZOOKEEPER_CONNECT_TIMEOUT, NiFiProperties.DEFAULT_ZOOKEEPER_CONNECT_TIMEOUT);
+        final String rootPath = properties.getProperty(NiFiProperties.ZOOKEEPER_ROOT_NODE, NiFiProperties.DEFAULT_ZOOKEEPER_ROOT_NODE);
+        final String accessControl = properties.getProperty(NiFiProperties.ZOOKEEPER_ACCESS_CONTROL);
+
+        final List<ACL> acls;
+        if (accessControl == null || accessControl.trim().isEmpty()) {
+            acls = null;
+        } else if (accessControl.equalsIgnoreCase("Open")) {
+            acls = Ids.OPEN_ACL_UNSAFE;
+        } else if (accessControl.equalsIgnoreCase("CreatorOnly")) {
+            acls = Ids.CREATOR_ALL_ACL;
+        } else {
+            acls = null;
+        }
+
+        try {
+            PathUtils.validatePath(rootPath);
+        } catch (final IllegalArgumentException e) {
+            throw new IllegalArgumentException("The '" + NiFiProperties.ZOOKEEPER_ROOT_NODE + "' property in nifi.properties is set to an illegal value: " + rootPath);
+        }
+
+        return new ZooKeeperClientConfig(connectString, (int) sessionTimeoutMs, (int) connectionTimeoutMs, rootPath, acls);
+    }
+
+    private static int getTimePeriod(final Properties properties, final String propertyName, final String defaultValue) {
+        final String timeout = properties.getProperty(propertyName, defaultValue);
+        try {
+            return (int) FormatUtils.getTimeDuration(timeout, TimeUnit.MILLISECONDS);
+        } catch (final Exception e) {
+            logger.warn("Value of '" + propertyName + "' property is set to '" + timeout + "', which is not a valid time period. Using default of " + defaultValue);
+            return (int) FormatUtils.getTimeDuration(defaultValue, TimeUnit.MILLISECONDS);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/fb7b3fe4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/cluster/ZooKeeperHeartbeater.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/cluster/ZooKeeperHeartbeater.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/cluster/ZooKeeperHeartbeater.java
new file mode 100644
index 0000000..4348cec
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/cluster/ZooKeeperHeartbeater.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.cluster;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Properties;
+
+import org.apache.curator.RetryPolicy;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.RetryForever;
+import org.apache.nifi.cluster.protocol.NodeProtocolSender;
+import org.apache.nifi.cluster.protocol.ProtocolException;
+import org.apache.nifi.cluster.protocol.message.HeartbeatMessage;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Uses ZooKeeper in order to determine which node is the elected Cluster Coordinator and to indicate
+ * that this node is part of the cluster. However, once the Cluster Coordinator is known, heartbeats are
+ * sent directly to the Cluster Coordinator.
+ */
+public class ZooKeeperHeartbeater implements Heartbeater {
+    private static final Logger logger = LoggerFactory.getLogger(ZooKeeperHeartbeater.class);
+
+    private final NodeProtocolSender protocolSender;
+    private final CuratorFramework curatorClient;
+    private final String nodesPathPrefix;
+
+    private final String coordinatorPath;
+    private volatile String coordinatorAddress;
+
+
+    public ZooKeeperHeartbeater(final NodeProtocolSender protocolSender, final Properties properties) {
+        this.protocolSender = protocolSender;
+
+        final RetryPolicy retryPolicy = new RetryForever(5000);
+        final ZooKeeperClientConfig zkConfig = ZooKeeperClientConfig.createConfig(properties);
+
+        curatorClient = CuratorFrameworkFactory.newClient(zkConfig.getConnectString(),
+            zkConfig.getSessionTimeoutMillis(), zkConfig.getConnectionTimeoutMillis(), retryPolicy);
+
+        curatorClient.start();
+        nodesPathPrefix = zkConfig.resolvePath("cluster/nodes");
+        coordinatorPath = nodesPathPrefix + "/coordinator";
+    }
+
+    private String getHeartbeatAddress() throws IOException {
+        final String curAddress = coordinatorAddress;
+        if (curAddress != null) {
+            return curAddress;
+        }
+
+        try {
+            // Get coordinator address and add watcher to change who we are heartbeating to if the value changes.
+            final byte[] coordinatorAddressBytes = curatorClient.getData().usingWatcher(new Watcher() {
+                @Override
+                public void process(final WatchedEvent event) {
+                    coordinatorAddress = null;
+                }
+            }).forPath(coordinatorPath);
+            final String address = coordinatorAddress = new String(coordinatorAddressBytes, StandardCharsets.UTF_8);
+
+            logger.info("Determined that Cluster Coordinator is located at {}; will use this address for sending heartbeat messages", address);
+            return address;
+        } catch (Exception e) {
+            throw new IOException("Unable to determine Cluster Coordinator from ZooKeeper", e);
+        }
+    }
+
+    @Override
+    public synchronized void send(final HeartbeatMessage heartbeatMessage) throws IOException {
+        final String heartbeatAddress = getHeartbeatAddress();
+
+        try {
+            protocolSender.heartbeat(heartbeatMessage, heartbeatAddress);
+        } catch (final ProtocolException pe) {
+            // a ProtocolException is likely the result of not being able to communicate
+            // with the coordinator. If we do get an IOException communicating with the coordinator,
+            // it will be the cause of the Protocol Exception. In this case, set coordinatorAddress
+            // to null so that we double-check next time that the coordinator has not changed.
+            if (pe.getCause() instanceof IOException) {
+                coordinatorAddress = null;
+            }
+
+            throw pe;
+        }
+    }
+
+
+    @Override
+    public void close() throws IOException {
+        if (curatorClient != null) {
+            curatorClient.close();
+        }
+
+        logger.info("ZooKeeper heartbeater closed. Will no longer send Heartbeat messages to ZooKeeper");
+    }
+}