You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by mc...@apache.org on 2016/04/22 21:01:50 UTC

[4/4] nifi git commit: NIFI-1678: Started refactoring heartbeating mechanism, using a new package: org.apache.nifi.cluster.coordination

NIFI-1678: Started refactoring heartbeating mechanism, using a new package: org.apache.nifi.cluster.coordination

Added configuration options for ZooKeeper username & password for heartbeat management. Also addressed issue where nodes that were previously disconnected were asked to disconnect upon restart

Ensure that ACL is set properly when creating heartbeat node. Removed unused ControllerStartupFailureMessage.java

Changed ZooKeeper ACL's so that container nodes that would not be sensitive are wide open and removed the usage of username & password when communicating with ZooKeeper. This was done specifically because username/password combination is considered a 'testing' feature that should not be used in production and is not supported by Apache Curator

Refactored CuratorHeartbeatMonitor into an abstract heartbeat monitor that is responsible for processing heartbeats and CuratorHeartbeatMonitor that is responsible for retrieving heartbeat information

Refactored so that heartbeats are sent to Cluster Coordinator directly instead of to ZooKeeper. ZooKeeper is used to know which node is the cluster coordinator but heartbeats to the Cluster Coordinator provide additional information about the nodes.

Code cleanup and incorporate comments from peer review

This closes #323


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/fb7b3fe4
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/fb7b3fe4
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/fb7b3fe4

Branch: refs/heads/master
Commit: fb7b3fe4b87b8c9f45a43d460a3020947ef55dcc
Parents: 9ea2275
Author: Mark Payne <ma...@hotmail.com>
Authored: Thu Mar 24 11:49:08 2016 -0400
Committer: Matt Gilman <ma...@gmail.com>
Committed: Fri Apr 22 15:01:04 2016 -0400

----------------------------------------------------------------------
 nifi-assembly/pom.xml                           |   1 +
 .../org/apache/nifi/util/NiFiProperties.java    |   1 +
 .../coordination/node/DisconnectionCode.java    |  75 ++++
 .../coordination/node/NodeConnectionState.java  |  50 +++
 .../coordination/node/NodeConnectionStatus.java |  94 ++++
 .../protocol/ClusterManagerProtocolSender.java  |  11 +
 .../cluster/protocol/ConnectionResponse.java    |  10 +-
 .../apache/nifi/cluster/protocol/Heartbeat.java |  12 +-
 .../nifi/cluster/protocol/NodeIdentifier.java   |  25 +-
 .../cluster/protocol/NodeProtocolSender.java    |  35 +-
 .../impl/ClusterManagerProtocolSenderImpl.java  |  44 ++
 .../ClusterManagerProtocolSenderListener.java   |   8 +
 .../protocol/impl/NodeProtocolSenderImpl.java   |  49 ++-
 .../impl/NodeProtocolSenderListener.java        |  20 +-
 .../jaxb/message/AdaptedConnectionResponse.java |   9 -
 .../protocol/jaxb/message/AdaptedHeartbeat.java |  12 +-
 .../message/AdaptedNodeConnectionStatus.java    |  60 +++
 .../jaxb/message/ConnectionResponseAdapter.java |   3 +-
 .../protocol/jaxb/message/HeartbeatAdapter.java |   5 +-
 .../message/NodeConnectionStatusAdapter.java    |  40 ++
 .../protocol/jaxb/message/ObjectFactory.java    |  11 +-
 .../ControllerStartupFailureMessage.java        |  49 ---
 .../protocol/message/HeartbeatMessage.java      |  12 +-
 .../message/NodeStatusChangeMessage.java        |  62 +++
 .../protocol/message/ProtocolMessage.java       |   3 +-
 .../message/ReconnectionFailureMessage.java     |  47 --
 .../impl/NodeProtocolSenderImplTest.java        |  31 +-
 .../nifi-framework-cluster/pom.xml              |  13 +
 .../coordination/ClusterCoordinator.java        | 127 ++++++
 .../heartbeat/AbstractHeartbeatMonitor.java     | 283 ++++++++++++
 .../ClusterProtocolHeartbeatMonitor.java        | 241 ++++++++++
 .../heartbeat/HeartbeatMonitor.java             |  55 +++
 .../coordination/heartbeat/NodeHeartbeat.java   |  64 +++
 .../heartbeat/StandardNodeHeartbeat.java        |  98 +++++
 .../cluster/coordination/node/ClusterNode.java  |  39 ++
 .../nifi/cluster/manager/ClusterManager.java    |   9 -
 .../cluster/manager/impl/WebClusterManager.java | 437 ++-----------------
 .../impl/WebClusterManagerCoordinator.java      | 246 +++++++++++
 .../java/org/apache/nifi/cluster/node/Node.java |  66 +--
 .../heartbeat/TestAbstractHeartbeatMonitor.java | 350 +++++++++++++++
 .../nifi/cluster/ConnectionException.java       |   4 +
 .../apache/nifi/controller/FlowController.java  | 303 ++++++++-----
 .../FlowSynchronizationException.java           |   4 +-
 .../nifi/controller/StandardFlowService.java    |  92 ++--
 .../controller/UninheritableFlowException.java  |   4 +-
 .../nifi/controller/cluster/Heartbeater.java    |  41 ++
 .../cluster/ZooKeeperClientConfig.java          | 117 +++++
 .../cluster/ZooKeeperHeartbeater.java           | 117 +++++
 .../election/CuratorLeaderElectionManager.java  |  37 +-
 .../scheduling/StandardProcessScheduler.java    |   4 +-
 .../zookeeper/ZooKeeperStateProvider.java       |  61 +--
 .../state/server/ZooKeeperStateServer.java      |  10 +-
 .../zookeeper/TestZooKeeperStateProvider.java   |  87 ----
 .../src/main/resources/conf/logback.xml         |   7 +
 .../src/main/resources/conf/nifi.properties     |   1 +
 .../main/resources/conf/state-management.xml    |  14 +-
 .../nifi/web/StandardNiFiServiceFacade.java     |  17 +-
 .../nifi/web/api/ControllerServiceResource.java |   2 +-
 .../org/apache/nifi/web/api/dto/DtoFactory.java |  18 +-
 59 files changed, 2688 insertions(+), 1059 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/fb7b3fe4/nifi-assembly/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-assembly/pom.xml b/nifi-assembly/pom.xml
index e85c83f..7f92a2a 100644
--- a/nifi-assembly/pom.xml
+++ b/nifi-assembly/pom.xml
@@ -457,6 +457,7 @@ language governing permissions and limitations under the License. -->
         <nifi.zookeeper.connect.timeout>3 secs</nifi.zookeeper.connect.timeout>
         <nifi.zookeeper.session.timeout>3 secs</nifi.zookeeper.session.timeout>
         <nifi.zookeeper.root.node>/nifi</nifi.zookeeper.root.node>
+        <nifi.zookeeper.access.control>Open</nifi.zookeeper.access.control>
 
         <!-- nifi.properties: kerberos properties -->
         <nifi.kerberos.krb5.file> </nifi.kerberos.krb5.file>

http://git-wip-us.apache.org/repos/asf/nifi/blob/fb7b3fe4/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java b/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java
index 63693bf..a6387ad 100644
--- a/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java
+++ b/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java
@@ -172,6 +172,7 @@ public class NiFiProperties extends Properties {
     public static final String ZOOKEEPER_CONNECT_TIMEOUT = "nifi.zookeeper.connect.timeout";
     public static final String ZOOKEEPER_SESSION_TIMEOUT = "nifi.zookeeper.session.timeout";
     public static final String ZOOKEEPER_ROOT_NODE = "nifi.zookeeper.root.node";
+    public static final String ZOOKEEPER_ACCESS_CONTROL = "nifi.zookeeper.access.control";
 
     // cluster manager properties
     public static final String CLUSTER_IS_MANAGER = "nifi.cluster.is.manager";

http://git-wip-us.apache.org/repos/asf/nifi/blob/fb7b3fe4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/node/DisconnectionCode.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/node/DisconnectionCode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/node/DisconnectionCode.java
new file mode 100644
index 0000000..bd20c3f
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/node/DisconnectionCode.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.cluster.coordination.node;
+
+/**
+ * An enumeration of the reasons that a node may be disconnected
+ * from the cluster
+ */
+public enum DisconnectionCode {
+    /**
+     * The node was disconnected for an unreported reason
+     */
+    UNKNOWN("Unknown Reason"),
+
+    /**
+     * The node has not yet connected to the cluster
+     */
+    NOT_YET_CONNECTED("Not Has Not Yet Connected to Cluster"),
+
+    /**
+     * A user explicitly disconnected the node from the cluster
+     */
+    USER_DISCONNECTED("User Disconnected Node"),
+
+    /**
+     * The node was disconnected because it stopped heartbeating
+     */
+    LACK_OF_HEARTBEAT("Lack of Heartbeat"),
+
+    /**
+     * The firewall prevented the node from joining the cluster
+     */
+    BLOCKED_BY_FIREWALL("Blocked by Firewall"),
+
+    /**
+     * The node failed to startup properly
+     */
+    STARTUP_FAILURE("Node Failed to Startup Properly"),
+
+    /**
+     * The node's flow did not match the cluster's flow
+     */
+    MISMATCHED_FLOWS("Node's Flow did not Match Cluster Flow"),
+
+    /**
+     * Node is being shut down
+     */
+    NODE_SHUTDOWN("Node was Shutdown");
+
+    private final String description;
+
+    private DisconnectionCode(final String description) {
+        this.description = description;
+    }
+
+    @Override
+    public String toString() {
+        return description;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/fb7b3fe4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/node/NodeConnectionState.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/node/NodeConnectionState.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/node/NodeConnectionState.java
new file mode 100644
index 0000000..9627b2d
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/node/NodeConnectionState.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.cluster.coordination.node;
+
+import javax.xml.bind.annotation.XmlEnum;
+
+@XmlEnum(String.class)
+public enum NodeConnectionState {
+    /**
+     * A node has issued a connection request to the cluster, but has not yet
+     * sent a heartbeat. A connecting node can transition to DISCONNECTED or CONNECTED. The cluster
+     * will not accept any external requests to change the flow while any node is in
+     * this state.
+     */
+    CONNECTING,
+
+    /**
+     * A node that is connected to the cluster. A connecting node transitions
+     * to connected after the cluster receives the node's first heartbeat. A
+     * connected node can transition to disconnecting.
+     */
+    CONNECTED,
+
+    /**
+     * A node that is in the process of disconnecting from the cluster.
+     * A DISCONNECTING node will always transition to DISCONNECTED.
+     */
+    DISCONNECTING,
+
+    /**
+     * A node that is not connected to the cluster.
+     * A DISCONNECTED node can transition to CONNECTING.
+     */
+    DISCONNECTED
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/fb7b3fe4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/node/NodeConnectionStatus.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/node/NodeConnectionStatus.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/node/NodeConnectionStatus.java
new file mode 100644
index 0000000..cd2a6a7
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/node/NodeConnectionStatus.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.cluster.coordination.node;
+
+import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter;
+
+import org.apache.nifi.cluster.protocol.jaxb.message.NodeConnectionStatusAdapter;
+
+/**
+ * Describes the current status of a node
+ */
+@XmlJavaTypeAdapter(NodeConnectionStatusAdapter.class)
+public class NodeConnectionStatus {
+    private final NodeConnectionState state;
+    private final DisconnectionCode disconnectCode;
+    private final String disconnectReason;
+    private final Long connectionRequestTime;
+
+    public NodeConnectionStatus(final NodeConnectionState state) {
+        this(state, null, null, null);
+    }
+
+    public NodeConnectionStatus(final NodeConnectionState state, final long connectionRequestTime) {
+        this(state, null, null, connectionRequestTime);
+    }
+
+    public NodeConnectionStatus(final DisconnectionCode disconnectionCode) {
+        this(NodeConnectionState.DISCONNECTED, disconnectionCode, disconnectionCode.name(), null);
+    }
+
+    public NodeConnectionStatus(final DisconnectionCode disconnectionCode, final String disconnectionExplanation) {
+        this(NodeConnectionState.DISCONNECTED, disconnectionCode, disconnectionExplanation, null);
+    }
+
+    public NodeConnectionStatus(final NodeConnectionState state, final DisconnectionCode disconnectionCode) {
+        this(state, disconnectionCode, disconnectionCode.name(), null);
+    }
+
+    public NodeConnectionStatus(final NodeConnectionState state, final DisconnectionCode disconnectCode, final String disconnectReason, final Long connectionRequestTime) {
+        this.state = state;
+        if (state == NodeConnectionState.DISCONNECTED && disconnectCode == null) {
+            this.disconnectCode = DisconnectionCode.UNKNOWN;
+            this.disconnectReason = this.disconnectCode.toString();
+        } else {
+            this.disconnectCode = disconnectCode;
+            this.disconnectReason = disconnectReason;
+        }
+
+        this.connectionRequestTime = connectionRequestTime;
+    }
+
+    public NodeConnectionState getState() {
+        return state;
+    }
+
+    public DisconnectionCode getDisconnectCode() {
+        return disconnectCode;
+    }
+
+    public String getDisconnectReason() {
+        return disconnectReason;
+    }
+
+    public Long getConnectionRequestTime() {
+        return connectionRequestTime;
+    }
+
+    @Override
+    public String toString() {
+        final StringBuilder sb = new StringBuilder();
+        final NodeConnectionState state = getState();
+        sb.append("NodeConnectionStatus[state=").append(state);
+        if (state == NodeConnectionState.DISCONNECTED || state == NodeConnectionState.DISCONNECTING) {
+            sb.append(", Disconnect Code=").append(getDisconnectCode()).append(", Disconnect Reason=").append(getDisconnectReason());
+        }
+        sb.append("]");
+        return sb.toString();
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/fb7b3fe4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ClusterManagerProtocolSender.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ClusterManagerProtocolSender.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ClusterManagerProtocolSender.java
index bdefbbf..010aed7 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ClusterManagerProtocolSender.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ClusterManagerProtocolSender.java
@@ -16,9 +16,12 @@
  */
 package org.apache.nifi.cluster.protocol;
 
+import java.util.Set;
+
 import org.apache.nifi.cluster.protocol.message.DisconnectMessage;
 import org.apache.nifi.cluster.protocol.message.FlowRequestMessage;
 import org.apache.nifi.cluster.protocol.message.FlowResponseMessage;
+import org.apache.nifi.cluster.protocol.message.NodeStatusChangeMessage;
 import org.apache.nifi.cluster.protocol.message.ReconnectionRequestMessage;
 import org.apache.nifi.cluster.protocol.message.ReconnectionResponseMessage;
 import org.apache.nifi.reporting.BulletinRepository;
@@ -61,4 +64,12 @@ public interface ClusterManagerProtocolSender {
      * @param bulletinRepository repo
      */
     void setBulletinRepository(final BulletinRepository bulletinRepository);
+
+    /**
+     * Notifies all nodes in the given set that a node in the cluster has a new status
+     *
+     * @param nodesToNotify the nodes that should be notified of the change
+     * @param msg the message that indicates which node's status changed and what it changed to
+     */
+    void notifyNodeStatusChange(Set<NodeIdentifier> nodesToNotify, NodeStatusChangeMessage msg);
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/fb7b3fe4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ConnectionResponse.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ConnectionResponse.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ConnectionResponse.java
index 96bde72..e6d8cf6 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ConnectionResponse.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ConnectionResponse.java
@@ -35,14 +35,13 @@ public class ConnectionResponse {
     private final int tryLaterSeconds;
     private final NodeIdentifier nodeIdentifier;
     private final StandardDataFlow dataFlow;
-    private final boolean primary;
     private final Integer managerRemoteInputPort;
     private final Boolean managerRemoteCommsSecure;
     private final String instanceId;
 
     private volatile String clusterManagerDN;
 
-    public ConnectionResponse(final NodeIdentifier nodeIdentifier, final StandardDataFlow dataFlow, final boolean primary,
+    public ConnectionResponse(final NodeIdentifier nodeIdentifier, final StandardDataFlow dataFlow,
         final Integer managerRemoteInputPort, final Boolean managerRemoteCommsSecure, final String instanceId) {
         if (nodeIdentifier == null) {
             throw new IllegalArgumentException("Node identifier may not be empty or null.");
@@ -53,7 +52,6 @@ public class ConnectionResponse {
         this.dataFlow = dataFlow;
         this.tryLaterSeconds = 0;
         this.rejectionReason = null;
-        this.primary = primary;
         this.managerRemoteInputPort = managerRemoteInputPort;
         this.managerRemoteCommsSecure = managerRemoteCommsSecure;
         this.instanceId = instanceId;
@@ -67,7 +65,6 @@ public class ConnectionResponse {
         this.nodeIdentifier = null;
         this.tryLaterSeconds = tryLaterSeconds;
         this.rejectionReason = null;
-        this.primary = false;
         this.managerRemoteInputPort = null;
         this.managerRemoteCommsSecure = null;
         this.instanceId = null;
@@ -78,7 +75,6 @@ public class ConnectionResponse {
         this.nodeIdentifier = null;
         this.tryLaterSeconds = 0;
         this.rejectionReason = rejectionReason;
-        this.primary = false;
         this.managerRemoteInputPort = null;
         this.managerRemoteCommsSecure = null;
         this.instanceId = null;
@@ -96,10 +92,6 @@ public class ConnectionResponse {
         return new ConnectionResponse(explanation);
     }
 
-    public boolean isPrimary() {
-        return primary;
-    }
-
     public boolean shouldTryLater() {
         return tryLaterSeconds > 0;
     }

http://git-wip-us.apache.org/repos/asf/nifi/blob/fb7b3fe4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/Heartbeat.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/Heartbeat.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/Heartbeat.java
index 80a4ba7..f2b0fde 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/Heartbeat.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/Heartbeat.java
@@ -19,6 +19,8 @@ package org.apache.nifi.cluster.protocol;
 import java.util.Date;
 import javax.xml.bind.annotation.XmlTransient;
 import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter;
+
+import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus;
 import org.apache.nifi.cluster.protocol.jaxb.message.HeartbeatAdapter;
 
 /**
@@ -30,17 +32,17 @@ public class Heartbeat {
 
     private final NodeIdentifier nodeIdentifier;
     private final boolean primary;
-    private final boolean connected;
+    private final NodeConnectionStatus connectionStatus;
     private final long createdTimestamp;
     private final byte[] payload;
 
-    public Heartbeat(final NodeIdentifier nodeIdentifier, final boolean primary, final boolean connected, final byte[] payload) {
+    public Heartbeat(final NodeIdentifier nodeIdentifier, final boolean primary, final NodeConnectionStatus connectionStatus, final byte[] payload) {
         if (nodeIdentifier == null) {
             throw new IllegalArgumentException("Node Identifier may not be null.");
         }
         this.nodeIdentifier = nodeIdentifier;
         this.primary = primary;
-        this.connected = connected;
+        this.connectionStatus = connectionStatus;
         this.payload = payload;
         this.createdTimestamp = new Date().getTime();
     }
@@ -57,8 +59,8 @@ public class Heartbeat {
         return primary;
     }
 
-    public boolean isConnected() {
-        return connected;
+    public NodeConnectionStatus getConnectionStatus() {
+        return connectionStatus;
     }
 
     @XmlTransient

http://git-wip-us.apache.org/repos/asf/nifi/blob/fb7b3fe4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/NodeIdentifier.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/NodeIdentifier.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/NodeIdentifier.java
index cb4dd6a..568fc20 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/NodeIdentifier.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/NodeIdentifier.java
@@ -16,6 +16,10 @@
  */
 package org.apache.nifi.cluster.protocol;
 
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlRootElement;
+
 import org.apache.commons.lang3.StringUtils;
 
 /**
@@ -32,6 +36,8 @@ import org.apache.commons.lang3.StringUtils;
  * @Immutable
  * @Threadsafe
  */
+@XmlRootElement
+@XmlAccessorType(XmlAccessType.FIELD)
 public class NodeIdentifier {
     /**
      * the unique identifier for the node
@@ -110,6 +116,21 @@ public class NodeIdentifier {
         this.siteToSiteSecure = siteToSiteSecure;
     }
 
+    /**
+     * This constructor should not be used and exists solely for the use of JAXB
+     */
+    public NodeIdentifier() {
+        this.id = null;
+        this.apiAddress = null;
+        this.apiPort = 0;
+        this.socketAddress = null;
+        this.socketPort = 0;
+        this.nodeDn = null;
+        this.siteToSiteAddress = null;
+        this.siteToSitePort = null;
+        this.siteToSiteSecure = false;
+    }
+
     public String getId() {
         return id;
     }
@@ -213,9 +234,7 @@ public class NodeIdentifier {
 
     @Override
     public String toString() {
-        return "[" + "id=" + id + ", apiAddress=" + apiAddress + ", apiPort=" + apiPort
-            + ", socketAddress=" + socketAddress + ", socketPort=" + socketPort
-            + ", siteToSiteAddress=" + siteToSiteAddress + ", siteToSitePort=" + siteToSitePort + ']';
+        return apiAddress + ":" + apiPort;
     }
 
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/fb7b3fe4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/NodeProtocolSender.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/NodeProtocolSender.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/NodeProtocolSender.java
index be0c339..432a03d 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/NodeProtocolSender.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/NodeProtocolSender.java
@@ -18,9 +18,7 @@ package org.apache.nifi.cluster.protocol;
 
 import org.apache.nifi.cluster.protocol.message.ConnectionRequestMessage;
 import org.apache.nifi.cluster.protocol.message.ConnectionResponseMessage;
-import org.apache.nifi.cluster.protocol.message.ControllerStartupFailureMessage;
 import org.apache.nifi.cluster.protocol.message.HeartbeatMessage;
-import org.apache.nifi.cluster.protocol.message.ReconnectionFailureMessage;
 
 /**
  * An interface for sending protocol messages from a node to the cluster
@@ -41,34 +39,11 @@ public interface NodeProtocolSender {
     ConnectionResponseMessage requestConnection(ConnectionRequestMessage msg) throws ProtocolException, UnknownServiceAddressException;
 
     /**
-     * Sends a "heartbeat" message to the cluster manager.
+     * Sends a heartbeat to the address given
      *
-     * @param msg a message
-     * @throws UnknownServiceAddressException if the cluster manager's address
-     * is not known
-     * @throws ProtocolException if communication failed
+     * @param msg the heartbeat message to send
+     * @param address the address of the Cluster Coordinator in &lt;hostname&gt;:&lt;port&gt; format
+     * @throws ProtocolException if unable to send the heartbeat
      */
-    void heartbeat(HeartbeatMessage msg) throws ProtocolException, UnknownServiceAddressException;
-
-    /**
-     * Sends a failure notification if the controller was unable start.
-     *
-     * @param msg a message
-     * @throws UnknownServiceAddressException if the cluster manager's address
-     * is not known
-     * @throws ProtocolException if communication failed
-     */
-    void notifyControllerStartupFailure(ControllerStartupFailureMessage msg) throws ProtocolException, UnknownServiceAddressException;
-
-    /**
-     * Sends a failure notification if the node was unable to reconnect to the
-     * cluster
-     *
-     * @param msg a message
-     * @throws UnknownServiceAddressException if the cluster manager's address
-     * is not known
-     * @throws ProtocolException if communication failed
-     */
-    void notifyReconnectionFailure(ReconnectionFailureMessage msg) throws ProtocolException, UnknownServiceAddressException;
-
+    void heartbeat(HeartbeatMessage msg, String address) throws ProtocolException;
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/fb7b3fe4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterManagerProtocolSenderImpl.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterManagerProtocolSenderImpl.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterManagerProtocolSenderImpl.java
index fb9292e..63ba5ba 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterManagerProtocolSenderImpl.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterManagerProtocolSenderImpl.java
@@ -16,10 +16,14 @@
  */
 package org.apache.nifi.cluster.protocol.impl;
 
+import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.Socket;
 import java.net.SocketException;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.nifi.cluster.protocol.ClusterManagerProtocolSender;
@@ -31,6 +35,7 @@ import org.apache.nifi.cluster.protocol.ProtocolMessageUnmarshaller;
 import org.apache.nifi.cluster.protocol.message.DisconnectMessage;
 import org.apache.nifi.cluster.protocol.message.FlowRequestMessage;
 import org.apache.nifi.cluster.protocol.message.FlowResponseMessage;
+import org.apache.nifi.cluster.protocol.message.NodeStatusChangeMessage;
 import org.apache.nifi.cluster.protocol.message.ProtocolMessage;
 import org.apache.nifi.cluster.protocol.message.ProtocolMessage.MessageType;
 import org.apache.nifi.cluster.protocol.message.ReconnectionRequestMessage;
@@ -39,6 +44,7 @@ import org.apache.nifi.io.socket.SocketConfiguration;
 import org.apache.nifi.io.socket.SocketUtils;
 import org.apache.nifi.reporting.BulletinRepository;
 import org.apache.nifi.util.FormatUtils;
+import org.apache.nifi.util.NiFiProperties;
 
 /**
  * A protocol sender for sending protocol messages from the cluster manager to
@@ -216,4 +222,42 @@ public class ClusterManagerProtocolSenderImpl implements ClusterManagerProtocolS
             throw new ProtocolException("Failed to create socket due to: " + ioe, ioe);
         }
     }
+
+    @Override
+    public void notifyNodeStatusChange(final Set<NodeIdentifier> nodesToNotify, final NodeStatusChangeMessage msg) {
+        final NiFiProperties properties = NiFiProperties.getInstance();
+        final int numThreads = Math.min(nodesToNotify.size(), properties.getClusterManagerProtocolThreads());
+
+        final byte[] msgBytes;
+        try (final ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
+            final ProtocolMessageMarshaller<ProtocolMessage> marshaller = protocolContext.createMarshaller();
+            marshaller.marshal(msg, baos);
+            msgBytes = baos.toByteArray();
+        } catch (final IOException e) {
+            throw new ProtocolException("Failed to marshal NodeStatusChangeMessage", e);
+        }
+
+        final ExecutorService executor = Executors.newFixedThreadPool(numThreads);
+        for (final NodeIdentifier nodeId : nodesToNotify) {
+            executor.submit(new Runnable() {
+                @Override
+                public void run() {
+                    try (final Socket socket = createSocket(nodeId, true)) {
+                        // marshal message to output stream
+                        socket.getOutputStream().write(msgBytes);
+                    } catch (final IOException ioe) {
+                        throw new ProtocolException("Failed to send Node Status Change message to " + nodeId, ioe);
+                    }
+                }
+            });
+        }
+
+        executor.shutdown();
+
+        try {
+            executor.awaitTermination(10, TimeUnit.DAYS);
+        } catch (final InterruptedException ie) {
+            throw new ProtocolException(ie);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/fb7b3fe4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterManagerProtocolSenderListener.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterManagerProtocolSenderListener.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterManagerProtocolSenderListener.java
index 54d33a8..2fc05b0 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterManagerProtocolSenderListener.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterManagerProtocolSenderListener.java
@@ -18,14 +18,17 @@ package org.apache.nifi.cluster.protocol.impl;
 
 import java.io.IOException;
 import java.util.Collection;
+import java.util.Set;
 
 import org.apache.nifi.cluster.protocol.ClusterManagerProtocolSender;
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
 import org.apache.nifi.cluster.protocol.ProtocolException;
 import org.apache.nifi.cluster.protocol.ProtocolHandler;
 import org.apache.nifi.cluster.protocol.ProtocolListener;
 import org.apache.nifi.cluster.protocol.message.DisconnectMessage;
 import org.apache.nifi.cluster.protocol.message.FlowRequestMessage;
 import org.apache.nifi.cluster.protocol.message.FlowResponseMessage;
+import org.apache.nifi.cluster.protocol.message.NodeStatusChangeMessage;
 import org.apache.nifi.cluster.protocol.message.ReconnectionRequestMessage;
 import org.apache.nifi.cluster.protocol.message.ReconnectionResponseMessage;
 import org.apache.nifi.reporting.BulletinRepository;
@@ -107,4 +110,9 @@ public class ClusterManagerProtocolSenderListener implements ClusterManagerProto
     public void disconnect(DisconnectMessage msg) throws ProtocolException {
         sender.disconnect(msg);
     }
+
+    @Override
+    public void notifyNodeStatusChange(Set<NodeIdentifier> nodesToNotify, NodeStatusChangeMessage msg) {
+        sender.notifyNodeStatusChange(nodesToNotify, msg);
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/fb7b3fe4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/NodeProtocolSenderImpl.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/NodeProtocolSenderImpl.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/NodeProtocolSenderImpl.java
index 886553e..10a58cf 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/NodeProtocolSenderImpl.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/NodeProtocolSenderImpl.java
@@ -17,6 +17,7 @@
 package org.apache.nifi.cluster.protocol.impl;
 
 import java.io.IOException;
+import java.net.InetSocketAddress;
 import java.net.Socket;
 import java.security.cert.CertificateException;
 import org.apache.nifi.cluster.protocol.NodeProtocolSender;
@@ -27,11 +28,9 @@ import org.apache.nifi.cluster.protocol.ProtocolMessageUnmarshaller;
 import org.apache.nifi.cluster.protocol.UnknownServiceAddressException;
 import org.apache.nifi.cluster.protocol.message.ConnectionRequestMessage;
 import org.apache.nifi.cluster.protocol.message.ConnectionResponseMessage;
-import org.apache.nifi.cluster.protocol.message.ControllerStartupFailureMessage;
 import org.apache.nifi.cluster.protocol.message.HeartbeatMessage;
 import org.apache.nifi.cluster.protocol.message.ProtocolMessage;
 import org.apache.nifi.cluster.protocol.message.ProtocolMessage.MessageType;
-import org.apache.nifi.cluster.protocol.message.ReconnectionFailureMessage;
 import org.apache.nifi.io.socket.SocketConfiguration;
 import org.apache.nifi.io.socket.SocketUtils;
 import org.apache.nifi.io.socket.multicast.DiscoverableService;
@@ -95,6 +94,20 @@ public class NodeProtocolSenderImpl implements NodeProtocolSender {
         }
     }
 
+    public void heartbeat(final HeartbeatMessage msg, final String address) throws ProtocolException {
+        final String hostname;
+        final int port;
+        try {
+            final String[] parts = address.split(":");
+            hostname = parts[0];
+            port = Integer.parseInt(parts[1]);
+        } catch (final Exception e) {
+            throw new IllegalArgumentException("Cannot send heartbeat to address [" + address + "]. Address must be in <hostname>:<port> format");
+        }
+
+        sendProtocolMessage(msg, hostname, port);
+    }
+
     private String getNCMDN(Socket socket) {
         try {
             return CertificateUtils.extractClientDNFromSSLSocket(socket);
@@ -103,21 +116,6 @@ public class NodeProtocolSenderImpl implements NodeProtocolSender {
         }
     }
 
-    @Override
-    public void heartbeat(final HeartbeatMessage msg) throws ProtocolException, UnknownServiceAddressException {
-        sendProtocolMessage(msg);
-    }
-
-    @Override
-    public void notifyControllerStartupFailure(final ControllerStartupFailureMessage msg) throws ProtocolException, UnknownServiceAddressException {
-        sendProtocolMessage(msg);
-    }
-
-    @Override
-    public void notifyReconnectionFailure(ReconnectionFailureMessage msg) throws ProtocolException, UnknownServiceAddressException {
-        sendProtocolMessage(msg);
-    }
-
     private Socket createSocket() {
         // determine the cluster manager's address
         final DiscoverableService service = clusterManagerProtocolServiceLocator.getService();
@@ -133,10 +131,18 @@ public class NodeProtocolSenderImpl implements NodeProtocolSender {
         }
     }
 
-    private void sendProtocolMessage(final ProtocolMessage msg) {
+    public SocketConfiguration getSocketConfiguration() {
+        return socketConfiguration;
+    }
+
+    private void sendProtocolMessage(final ProtocolMessage msg, final String hostname, final int port) {
         Socket socket = null;
         try {
-            socket = createSocket();
+            try {
+                socket = SocketUtils.createSocket(new InetSocketAddress(hostname, port), socketConfiguration);
+            } catch (IOException e) {
+                throw new ProtocolException("Failed to send message to Cluster Coordinator due to: " + e, e);
+            }
 
             try {
                 // marshal message to output stream
@@ -149,9 +155,4 @@ public class NodeProtocolSenderImpl implements NodeProtocolSender {
             SocketUtils.closeQuietly(socket);
         }
     }
-
-    public SocketConfiguration getSocketConfiguration() {
-        return socketConfiguration;
-    }
-
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/fb7b3fe4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/NodeProtocolSenderListener.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/NodeProtocolSenderListener.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/NodeProtocolSenderListener.java
index 0a9a064..3d0eb8e 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/NodeProtocolSenderListener.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/NodeProtocolSenderListener.java
@@ -26,9 +26,7 @@ import org.apache.nifi.cluster.protocol.ProtocolListener;
 import org.apache.nifi.cluster.protocol.UnknownServiceAddressException;
 import org.apache.nifi.cluster.protocol.message.ConnectionRequestMessage;
 import org.apache.nifi.cluster.protocol.message.ConnectionResponseMessage;
-import org.apache.nifi.cluster.protocol.message.ControllerStartupFailureMessage;
 import org.apache.nifi.cluster.protocol.message.HeartbeatMessage;
-import org.apache.nifi.cluster.protocol.message.ReconnectionFailureMessage;
 import org.apache.nifi.reporting.BulletinRepository;
 
 public class NodeProtocolSenderListener implements NodeProtocolSender, ProtocolListener {
@@ -83,27 +81,17 @@ public class NodeProtocolSenderListener implements NodeProtocolSender, ProtocolL
     }
 
     @Override
-    public void heartbeat(final HeartbeatMessage msg) throws ProtocolException, UnknownServiceAddressException {
-        sender.heartbeat(msg);
-    }
-
-    @Override
     public ConnectionResponseMessage requestConnection(final ConnectionRequestMessage msg) throws ProtocolException, UnknownServiceAddressException {
         return sender.requestConnection(msg);
     }
 
     @Override
-    public void notifyControllerStartupFailure(final ControllerStartupFailureMessage msg) throws ProtocolException, UnknownServiceAddressException {
-        sender.notifyControllerStartupFailure(msg);
-    }
-
-    @Override
-    public void notifyReconnectionFailure(final ReconnectionFailureMessage msg) throws ProtocolException, UnknownServiceAddressException {
-        sender.notifyReconnectionFailure(msg);
+    public void setBulletinRepository(final BulletinRepository bulletinRepository) {
+        listener.setBulletinRepository(bulletinRepository);
     }
 
     @Override
-    public void setBulletinRepository(final BulletinRepository bulletinRepository) {
-        listener.setBulletinRepository(bulletinRepository);
+    public void heartbeat(HeartbeatMessage msg, String address) throws ProtocolException {
+        sender.heartbeat(msg, address);
     }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/fb7b3fe4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedConnectionResponse.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedConnectionResponse.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedConnectionResponse.java
index 4243b41..d42515e 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedConnectionResponse.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedConnectionResponse.java
@@ -28,7 +28,6 @@ public class AdaptedConnectionResponse {
     private StandardDataFlow dataFlow;
     private NodeIdentifier nodeIdentifier;
     private String rejectionReason;
-    private boolean primary;
     private int tryLaterSeconds;
     private Integer managerRemoteInputPort;
     private Boolean managerRemoteCommsSecure;
@@ -71,14 +70,6 @@ public class AdaptedConnectionResponse {
         this.rejectionReason = rejectionReason;
     }
 
-    public boolean isPrimary() {
-        return primary;
-    }
-
-    public void setPrimary(boolean primary) {
-        this.primary = primary;
-    }
-
     public boolean shouldTryLater() {
         return tryLaterSeconds > 0;
     }

http://git-wip-us.apache.org/repos/asf/nifi/blob/fb7b3fe4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedHeartbeat.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedHeartbeat.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedHeartbeat.java
index cec3757..9501b48 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedHeartbeat.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedHeartbeat.java
@@ -17,6 +17,8 @@
 package org.apache.nifi.cluster.protocol.jaxb.message;
 
 import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter;
+
+import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus;
 import org.apache.nifi.cluster.protocol.NodeIdentifier;
 
 /**
@@ -26,7 +28,7 @@ public class AdaptedHeartbeat {
     private NodeIdentifier nodeIdentifier;
     private byte[] payload;
     private boolean primary;
-    private boolean connected;
+    private NodeConnectionStatus connectionStatus;
 
     public AdaptedHeartbeat() {
     }
@@ -48,12 +50,12 @@ public class AdaptedHeartbeat {
         this.primary = primary;
     }
 
-    public boolean isConnected() {
-        return connected;
+    public void setConnectionStatus(NodeConnectionStatus connectionStatus) {
+        this.connectionStatus = connectionStatus;
     }
 
-    public void setConnected(boolean connected) {
-        this.connected = connected;
+    public NodeConnectionStatus getConnectionStatus() {
+        return connectionStatus;
     }
 
     public byte[] getPayload() {

http://git-wip-us.apache.org/repos/asf/nifi/blob/fb7b3fe4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedNodeConnectionStatus.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedNodeConnectionStatus.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedNodeConnectionStatus.java
new file mode 100644
index 0000000..f9ec3b1
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedNodeConnectionStatus.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.cluster.protocol.jaxb.message;
+
+import org.apache.nifi.cluster.coordination.node.DisconnectionCode;
+import org.apache.nifi.cluster.coordination.node.NodeConnectionState;
+
+public class AdaptedNodeConnectionStatus {
+    private NodeConnectionState state;
+    private DisconnectionCode disconnectCode;
+    private String disconnectReason;
+    private Long connectionRequestTime;
+
+    public NodeConnectionState getState() {
+        return state;
+    }
+
+    public void setState(NodeConnectionState state) {
+        this.state = state;
+    }
+
+    public DisconnectionCode getDisconnectCode() {
+        return disconnectCode;
+    }
+
+    public void setDisconnectCode(DisconnectionCode disconnectCode) {
+        this.disconnectCode = disconnectCode;
+    }
+
+    public String getDisconnectReason() {
+        return disconnectReason;
+    }
+
+    public void setDisconnectReason(String disconnectReason) {
+        this.disconnectReason = disconnectReason;
+    }
+
+    public Long getConnectionRequestTime() {
+        return connectionRequestTime;
+    }
+
+    public void setConnectionRequestTime(Long connectionRequestTime) {
+        this.connectionRequestTime = connectionRequestTime;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/fb7b3fe4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/ConnectionResponseAdapter.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/ConnectionResponseAdapter.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/ConnectionResponseAdapter.java
index b2c1c67..a1bc907 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/ConnectionResponseAdapter.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/ConnectionResponseAdapter.java
@@ -31,7 +31,6 @@ public class ConnectionResponseAdapter extends XmlAdapter<AdaptedConnectionRespo
             aCr.setNodeIdentifier(cr.getNodeIdentifier());
             aCr.setTryLaterSeconds(cr.getTryLaterSeconds());
             aCr.setRejectionReason(cr.getRejectionReason());
-            aCr.setPrimary(cr.isPrimary());
             aCr.setManagerRemoteInputPort(cr.getManagerRemoteInputPort());
             aCr.setManagerRemoteCommsSecure(cr.isManagerRemoteCommsSecure());
             aCr.setInstanceId(cr.getInstanceId());
@@ -46,7 +45,7 @@ public class ConnectionResponseAdapter extends XmlAdapter<AdaptedConnectionRespo
         } else if (aCr.getRejectionReason() != null) {
             return ConnectionResponse.createRejectionResponse(aCr.getRejectionReason());
         } else {
-            return new ConnectionResponse(aCr.getNodeIdentifier(), aCr.getDataFlow(), aCr.isPrimary(),
+            return new ConnectionResponse(aCr.getNodeIdentifier(), aCr.getDataFlow(),
                 aCr.getManagerRemoteInputPort(), aCr.isManagerRemoteCommsSecure(), aCr.getInstanceId());
         }
     }

http://git-wip-us.apache.org/repos/asf/nifi/blob/fb7b3fe4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/HeartbeatAdapter.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/HeartbeatAdapter.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/HeartbeatAdapter.java
index 412332f..2666abd 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/HeartbeatAdapter.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/HeartbeatAdapter.java
@@ -25,7 +25,6 @@ public class HeartbeatAdapter extends XmlAdapter<AdaptedHeartbeat, Heartbeat> {
 
     @Override
     public AdaptedHeartbeat marshal(final Heartbeat hb) {
-
         final AdaptedHeartbeat aHb = new AdaptedHeartbeat();
 
         if (hb != null) {
@@ -39,7 +38,7 @@ public class HeartbeatAdapter extends XmlAdapter<AdaptedHeartbeat, Heartbeat> {
             aHb.setPrimary(hb.isPrimary());
 
             // set connected flag
-            aHb.setConnected(hb.isConnected());
+            aHb.setConnectionStatus(hb.getConnectionStatus());
         }
 
         return aHb;
@@ -47,7 +46,7 @@ public class HeartbeatAdapter extends XmlAdapter<AdaptedHeartbeat, Heartbeat> {
 
     @Override
     public Heartbeat unmarshal(final AdaptedHeartbeat aHb) {
-        return new Heartbeat(aHb.getNodeIdentifier(), aHb.isPrimary(), aHb.isConnected(), aHb.getPayload());
+        return new Heartbeat(aHb.getNodeIdentifier(), aHb.isPrimary(), aHb.getConnectionStatus(), aHb.getPayload());
     }
 
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/fb7b3fe4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/NodeConnectionStatusAdapter.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/NodeConnectionStatusAdapter.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/NodeConnectionStatusAdapter.java
new file mode 100644
index 0000000..e2c302d
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/NodeConnectionStatusAdapter.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.cluster.protocol.jaxb.message;
+
+import javax.xml.bind.annotation.adapters.XmlAdapter;
+
+import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus;
+
+public class NodeConnectionStatusAdapter extends XmlAdapter<AdaptedNodeConnectionStatus, NodeConnectionStatus> {
+
+    @Override
+    public NodeConnectionStatus unmarshal(final AdaptedNodeConnectionStatus adapted) throws Exception {
+        return new NodeConnectionStatus(adapted.getState(), adapted.getDisconnectCode(), adapted.getDisconnectReason(), adapted.getConnectionRequestTime());
+    }
+
+    @Override
+    public AdaptedNodeConnectionStatus marshal(final NodeConnectionStatus toAdapt) throws Exception {
+        final AdaptedNodeConnectionStatus adapted = new AdaptedNodeConnectionStatus();
+        adapted.setConnectionRequestTime(toAdapt.getConnectionRequestTime());
+        adapted.setDisconnectCode(toAdapt.getDisconnectCode());
+        adapted.setDisconnectReason(toAdapt.getDisconnectReason());
+        adapted.setState(toAdapt.getState());
+        return adapted;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/fb7b3fe4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/ObjectFactory.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/ObjectFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/ObjectFactory.java
index 25041ce..82df546 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/ObjectFactory.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/ObjectFactory.java
@@ -20,14 +20,13 @@ import javax.xml.bind.annotation.XmlRegistry;
 
 import org.apache.nifi.cluster.protocol.message.ConnectionRequestMessage;
 import org.apache.nifi.cluster.protocol.message.ConnectionResponseMessage;
-import org.apache.nifi.cluster.protocol.message.ControllerStartupFailureMessage;
 import org.apache.nifi.cluster.protocol.message.DisconnectMessage;
 import org.apache.nifi.cluster.protocol.message.FlowRequestMessage;
 import org.apache.nifi.cluster.protocol.message.FlowResponseMessage;
 import org.apache.nifi.cluster.protocol.message.HeartbeatMessage;
 import org.apache.nifi.cluster.protocol.message.MulticastProtocolMessage;
+import org.apache.nifi.cluster.protocol.message.NodeStatusChangeMessage;
 import org.apache.nifi.cluster.protocol.message.PingMessage;
-import org.apache.nifi.cluster.protocol.message.ReconnectionFailureMessage;
 import org.apache.nifi.cluster.protocol.message.ReconnectionRequestMessage;
 import org.apache.nifi.cluster.protocol.message.ReconnectionResponseMessage;
 import org.apache.nifi.cluster.protocol.message.ServiceBroadcastMessage;
@@ -44,10 +43,6 @@ public class ObjectFactory {
         return new ReconnectionRequestMessage();
     }
 
-    public ReconnectionFailureMessage createReconnectionFailureMessage() {
-        return new ReconnectionFailureMessage();
-    }
-
     public ReconnectionResponseMessage createReconnectionResponseMessage() {
         return new ReconnectionResponseMessage();
     }
@@ -88,7 +83,7 @@ public class ObjectFactory {
         return new MulticastProtocolMessage();
     }
 
-    public ControllerStartupFailureMessage createControllerStartupFailureMessage() {
-        return new ControllerStartupFailureMessage();
+    public NodeStatusChangeMessage createNodeStatusChangeMessage() {
+        return new NodeStatusChangeMessage();
     }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/fb7b3fe4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ControllerStartupFailureMessage.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ControllerStartupFailureMessage.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ControllerStartupFailureMessage.java
deleted file mode 100644
index 3d3bd43..0000000
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ControllerStartupFailureMessage.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.cluster.protocol.message;
-
-import javax.xml.bind.annotation.XmlRootElement;
-import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter;
-
-import org.apache.nifi.cluster.protocol.NodeIdentifier;
-import org.apache.nifi.cluster.protocol.jaxb.message.NodeIdentifierAdapter;
-
-/**
- */
-@XmlRootElement(name = "controllerStartupFailureMessage")
-public class ControllerStartupFailureMessage extends ExceptionMessage {
-
-    private NodeIdentifier nodeId;
-
-    public ControllerStartupFailureMessage() {
-    }
-
-    @Override
-    public MessageType getType() {
-        return MessageType.CONTROLLER_STARTUP_FAILURE;
-    }
-
-    @XmlJavaTypeAdapter(NodeIdentifierAdapter.class)
-    public NodeIdentifier getNodeId() {
-        return nodeId;
-    }
-
-    public void setNodeId(NodeIdentifier nodeId) {
-        this.nodeId = nodeId;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/fb7b3fe4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/HeartbeatMessage.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/HeartbeatMessage.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/HeartbeatMessage.java
index 15432b1..b5b2ecb 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/HeartbeatMessage.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/HeartbeatMessage.java
@@ -16,9 +16,10 @@
  */
 package org.apache.nifi.cluster.protocol.message;
 
-import org.apache.nifi.cluster.protocol.Heartbeat;
 import javax.xml.bind.annotation.XmlRootElement;
 
+import org.apache.nifi.cluster.protocol.Heartbeat;
+
 /**
  */
 @XmlRootElement(name = "heartbeatMessage")
@@ -26,11 +27,6 @@ public class HeartbeatMessage extends ProtocolMessage {
 
     private Heartbeat heartbeat;
 
-    @Override
-    public MessageType getType() {
-        return MessageType.HEARTBEAT;
-    }
-
     public Heartbeat getHeartbeat() {
         return heartbeat;
     }
@@ -39,4 +35,8 @@ public class HeartbeatMessage extends ProtocolMessage {
         this.heartbeat = heartbeat;
     }
 
+    @Override
+    public MessageType getType() {
+        return MessageType.HEARTBEAT;
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/fb7b3fe4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/NodeStatusChangeMessage.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/NodeStatusChangeMessage.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/NodeStatusChangeMessage.java
new file mode 100644
index 0000000..7a99d0e
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/NodeStatusChangeMessage.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.cluster.protocol.message;
+
+import javax.xml.bind.annotation.XmlRootElement;
+
+import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus;
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
+
+/**
+ * Message to indicate that the status of a node in the cluster has changed
+ */
+@XmlRootElement(name = "nodeStatusChange")
+public class NodeStatusChangeMessage extends ProtocolMessage {
+    private NodeConnectionStatus connectionStatus;
+    private NodeIdentifier nodeId;
+    private Long statusUpdateId = -1L;
+
+    @Override
+    public MessageType getType() {
+        return MessageType.NODE_STATUS_CHANGE;
+    }
+
+    public void setNodeConnectionStatus(final NodeConnectionStatus status) {
+        this.connectionStatus = status;
+    }
+
+    public NodeConnectionStatus getNodeConnectionStatus() {
+        return connectionStatus;
+    }
+
+    public NodeIdentifier getNodeId() {
+        return nodeId;
+    }
+
+    public void setNodeId(NodeIdentifier nodeId) {
+        this.nodeId = nodeId;
+    }
+
+    public Long getStatusUpdateIdentifier() {
+        return statusUpdateId;
+    }
+
+    public void setStatusUpdateIdentifier(Long statusUpdateId) {
+        this.statusUpdateId = statusUpdateId;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/fb7b3fe4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ProtocolMessage.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ProtocolMessage.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ProtocolMessage.java
index 5953e09..27be95f 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ProtocolMessage.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ProtocolMessage.java
@@ -23,8 +23,6 @@ public abstract class ProtocolMessage {
     public static enum MessageType {
         CONNECTION_REQUEST,
         CONNECTION_RESPONSE,
-        CONTROLLER_STARTUP_FAILURE,
-        RECONNECTION_FAILURE,
         DISCONNECTION_REQUEST,
         EXCEPTION,
         FLOW_REQUEST,
@@ -34,6 +32,7 @@ public abstract class ProtocolMessage {
         RECONNECTION_REQUEST,
         RECONNECTION_RESPONSE,
         SERVICE_BROADCAST,
+        NODE_STATUS_CHANGE;
     }
 
     public abstract MessageType getType();

http://git-wip-us.apache.org/repos/asf/nifi/blob/fb7b3fe4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ReconnectionFailureMessage.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ReconnectionFailureMessage.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ReconnectionFailureMessage.java
deleted file mode 100644
index ce62c5b..0000000
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ReconnectionFailureMessage.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.cluster.protocol.message;
-
-import javax.xml.bind.annotation.XmlRootElement;
-import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter;
-
-import org.apache.nifi.cluster.protocol.NodeIdentifier;
-import org.apache.nifi.cluster.protocol.jaxb.message.NodeIdentifierAdapter;
-
-@XmlRootElement(name = "reconnectionFailureMessage")
-public class ReconnectionFailureMessage extends ExceptionMessage {
-
-    private NodeIdentifier nodeId;
-
-    public ReconnectionFailureMessage() {
-    }
-
-    @Override
-    public MessageType getType() {
-        return MessageType.RECONNECTION_FAILURE;
-    }
-
-    @XmlJavaTypeAdapter(NodeIdentifierAdapter.class)
-    public NodeIdentifier getNodeId() {
-        return nodeId;
-    }
-
-    public void setNodeId(NodeIdentifier nodeId) {
-        this.nodeId = nodeId;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/fb7b3fe4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/impl/NodeProtocolSenderImplTest.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/impl/NodeProtocolSenderImplTest.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/impl/NodeProtocolSenderImplTest.java
index 94c0a20..f9a986f 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/impl/NodeProtocolSenderImplTest.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/impl/NodeProtocolSenderImplTest.java
@@ -28,7 +28,6 @@ import java.util.UUID;
 
 import org.apache.nifi.cluster.protocol.ConnectionRequest;
 import org.apache.nifi.cluster.protocol.ConnectionResponse;
-import org.apache.nifi.cluster.protocol.Heartbeat;
 import org.apache.nifi.cluster.protocol.NodeIdentifier;
 import org.apache.nifi.cluster.protocol.ProtocolContext;
 import org.apache.nifi.cluster.protocol.ProtocolException;
@@ -39,8 +38,6 @@ import org.apache.nifi.cluster.protocol.jaxb.JaxbProtocolContext;
 import org.apache.nifi.cluster.protocol.jaxb.message.JaxbProtocolUtils;
 import org.apache.nifi.cluster.protocol.message.ConnectionRequestMessage;
 import org.apache.nifi.cluster.protocol.message.ConnectionResponseMessage;
-import org.apache.nifi.cluster.protocol.message.ControllerStartupFailureMessage;
-import org.apache.nifi.cluster.protocol.message.HeartbeatMessage;
 import org.apache.nifi.cluster.protocol.message.PingMessage;
 import org.apache.nifi.cluster.protocol.message.ProtocolMessage;
 import org.apache.nifi.io.socket.ServerSocketConfiguration;
@@ -109,7 +106,7 @@ public class NodeProtocolSenderImplTest {
         when(mockHandler.canHandle(any(ProtocolMessage.class))).thenReturn(Boolean.TRUE);
         ConnectionResponseMessage mockMessage = new ConnectionResponseMessage();
         mockMessage.setConnectionResponse(new ConnectionResponse(nodeIdentifier,
-            new StandardDataFlow("flow".getBytes("UTF-8"), new byte[0], new byte[0]), false, null, null, UUID.randomUUID().toString()));
+            new StandardDataFlow("flow".getBytes("UTF-8"), new byte[0], new byte[0]), null, null, UUID.randomUUID().toString()));
         when(mockHandler.handle(any(ProtocolMessage.class))).thenReturn(mockMessage);
 
         ConnectionRequestMessage request = new ConnectionRequestMessage();
@@ -168,30 +165,4 @@ public class NodeProtocolSenderImplTest {
         fail("failed to throw exception");
 
     }
-
-    @Test
-    public void testHeartbeat() throws Exception {
-
-        when(mockServiceLocator.getService()).thenReturn(service);
-        when(mockHandler.canHandle(any(ProtocolMessage.class))).thenReturn(Boolean.TRUE);
-        when(mockHandler.handle(any(ProtocolMessage.class))).thenReturn(null);
-
-        HeartbeatMessage hb = new HeartbeatMessage();
-        hb.setHeartbeat(new Heartbeat(new NodeIdentifier("id", "localhost", 3, "localhost", 4, "localhost", 3821, false), false, false, new byte[] {1, 2, 3}));
-        sender.heartbeat(hb);
-    }
-
-    @Test
-    public void testNotifyControllerStartupFailure() throws Exception {
-
-        when(mockServiceLocator.getService()).thenReturn(service);
-        when(mockHandler.canHandle(any(ProtocolMessage.class))).thenReturn(Boolean.TRUE);
-        when(mockHandler.handle(any(ProtocolMessage.class))).thenReturn(null);
-
-        ControllerStartupFailureMessage msg = new ControllerStartupFailureMessage();
-        msg.setNodeId(new NodeIdentifier("some-id", "some-addr", 1, "some-addr", 1, "localhost", 3821, false));
-        msg.setExceptionMessage("some exception");
-        sender.notifyControllerStartupFailure(msg);
-    }
-
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/fb7b3fe4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/pom.xml b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/pom.xml
index 1e1eae7..e5a1a7d 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/pom.xml
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/pom.xml
@@ -134,6 +134,19 @@
             <groupId>org.springframework</groupId>
             <artifactId>spring-context</artifactId>
         </dependency>
+        
+        <!-- testing dependencies for ZooKeeper / Curator -->
+        <dependency>
+            <groupId>org.apache.curator</groupId>
+            <artifactId>curator-test</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.testng</groupId>
+            <artifactId>testng</artifactId>
+            <scope>test</scope>
+        </dependency>
+        
     </dependencies>
     <build>
         <plugins>

http://git-wip-us.apache.org/repos/asf/nifi/blob/fb7b3fe4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/ClusterCoordinator.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/ClusterCoordinator.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/ClusterCoordinator.java
new file mode 100644
index 0000000..59ded24
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/ClusterCoordinator.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.cluster.coordination;
+
+import java.util.Set;
+
+import org.apache.nifi.cluster.coordination.node.DisconnectionCode;
+import org.apache.nifi.cluster.coordination.node.NodeConnectionState;
+import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus;
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
+import org.apache.nifi.reporting.Severity;
+
+/**
+ * <p>
+ * Responsible for coordinating nodes in the cluster
+ * <p>
+ */
+public interface ClusterCoordinator {
+
+    /**
+     * Sends a request to the node to connect to the cluster. This will immediately
+     * set the NodeConnectionStatus to DISCONNECTED.
+     *
+     * @param nodeId the identifier of the node
+     */
+    void requestNodeConnect(NodeIdentifier nodeId);
+
+    /**
+     * Indicates that the node has sent a valid heartbeat and should now
+     * be considered part of the cluster
+     *
+     * @param nodeId the identifier of the node
+     */
+    void finishNodeConnection(NodeIdentifier nodeId);
+
+    /**
+     * Sends a request to the node to disconnect from the cluster.
+     * The node will be marked as disconnected immediately.
+     *
+     * @param nodeId the identifier of the node
+     * @param disconnectionCode the code that represents why this node is being asked to disconnect
+     * @param explanation an explanation as to why the node is being asked to disconnect
+     *            from the cluster
+     */
+    void requestNodeDisconnect(NodeIdentifier nodeId, DisconnectionCode disconnectionCode, String explanation);
+
+    /**
+     * Notifies the Cluster Coordinator that the node with the given ID has requested to disconnect
+     * from the cluster. If no node exists in the cluster with the given ID, this method has no effect.
+     *
+     * @param nodeId the identifier of the node
+     * @param disconnectionCode the code that represents why this node is requesting to disconnect
+     * @param explanation an explanation as to why the node is requesting to disconnect from the cluster
+     */
+    void disconnectionRequestedByNode(NodeIdentifier nodeId, DisconnectionCode disconnectionCode, String explanation);
+
+    /**
+     * Returns the current status of the node with the given identifier
+     *
+     * @param nodeId the identifier of the node
+     *
+     * @return the current status of the node with the given identifier,
+     *         or <code>null</code> if no node is known with the given identifier
+     */
+    NodeConnectionStatus getConnectionStatus(NodeIdentifier nodeId);
+
+    /**
+     * Returns the identifiers of all nodes that have the given connection state
+     *
+     * @param state the state
+     * @return the identifiers of all nodes that have the given connection state
+     */
+    Set<NodeIdentifier> getNodeIdentifiers(NodeConnectionState state);
+
+    /**
+     * Checks if the given hostname is blocked by the configured firewall, returning
+     * <code>true</code> if the node is blocked, <code>false</code> if the node is
+     * allowed through the firewall or if there is no firewall configured
+     *
+     * @param hostname the hostname of the node that is attempting to connect to the cluster
+     *
+     * @return <code>true</code> if the node is blocked, <code>false</code> if the node is
+     *         allowed through the firewall or if there is no firewall configured
+     */
+    boolean isBlockedByFirewall(String hostname);
+
+    /**
+     * Reports that some event occurred that is relevant to the cluster
+     *
+     * @param nodeId the identifier of the node that the event pertains to, or <code>null</code> if not applicable
+     * @param severity the severity of the event
+     * @param event an explanation of the event
+     */
+    void reportEvent(NodeIdentifier nodeId, Severity severity, String event);
+
+    /**
+     * Updates the node that is considered the Primary Node
+     *
+     * @param nodeId the id of the Primary Node
+     */
+    void setPrimaryNode(NodeIdentifier nodeId);
+
+    /**
+     * Returns the NodeIdentifier that exists that has the given UUID, or <code>null</code> if no NodeIdentifier
+     * exists for the given UUID
+     *
+     * @param uuid the UUID of the NodeIdentifier to obtain
+     * @return the NodeIdentifier that exists that has the given UUID, or <code>null</code> if no NodeIdentifier
+     *         exists for the given UUID
+     */
+    NodeIdentifier getNodeIdentifier(String uuid);
+}