You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by mc...@apache.org on 2016/04/22 21:01:47 UTC

[1/4] nifi git commit: NIFI-1678: Started refactoring heartbeating mechanism, using a new package: org.apache.nifi.cluster.coordination

Repository: nifi
Updated Branches:
  refs/heads/master 9ea227567 -> fb7b3fe4b


http://git-wip-us.apache.org/repos/asf/nifi/blob/fb7b3fe4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/leader/election/CuratorLeaderElectionManager.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/leader/election/CuratorLeaderElectionManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/leader/election/CuratorLeaderElectionManager.java
index 4c0cbd0..c846df8 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/leader/election/CuratorLeaderElectionManager.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/leader/election/CuratorLeaderElectionManager.java
@@ -19,7 +19,6 @@ package org.apache.nifi.controller.leader.election;
 
 import java.util.HashMap;
 import java.util.Map;
-import java.util.concurrent.TimeUnit;
 
 import org.apache.curator.RetryPolicy;
 import org.apache.curator.framework.CuratorFramework;
@@ -29,8 +28,8 @@ import org.apache.curator.framework.recipes.leader.LeaderSelectorListener;
 import org.apache.curator.framework.recipes.leader.LeaderSelectorListenerAdapter;
 import org.apache.curator.framework.state.ConnectionState;
 import org.apache.curator.retry.RetryForever;
+import org.apache.nifi.controller.cluster.ZooKeeperClientConfig;
 import org.apache.nifi.engine.FlowEngine;
-import org.apache.nifi.util.FormatUtils;
 import org.apache.nifi.util.NiFiProperties;
 import org.apache.zookeeper.common.PathUtils;
 import org.slf4j.Logger;
@@ -40,10 +39,7 @@ public class CuratorLeaderElectionManager implements LeaderElectionManager {
     private static final Logger logger = LoggerFactory.getLogger(CuratorLeaderElectionManager.class);
 
     private final FlowEngine leaderElectionMonitorEngine;
-    private final int sessionTimeoutMs;
-    private final int connectionTimeoutMs;
-    private final String rootPath;
-    private final String connectString;
+    private final ZooKeeperClientConfig zkConfig;
 
     private CuratorFramework curatorClient;
 
@@ -56,21 +52,7 @@ public class CuratorLeaderElectionManager implements LeaderElectionManager {
         leaderElectionMonitorEngine = new FlowEngine(threadPoolSize, "Leader Election Notification", true);
 
         final NiFiProperties properties = NiFiProperties.getInstance();
-
-        connectString = properties.getProperty(NiFiProperties.ZOOKEEPER_CONNECT_STRING);
-        if (connectString == null || connectString.trim().isEmpty()) {
-            throw new IllegalStateException("The '" + NiFiProperties.ZOOKEEPER_CONNECT_STRING + "' property is not set in nifi.properties");
-        }
-
-        sessionTimeoutMs = getTimePeriod(properties, NiFiProperties.ZOOKEEPER_SESSION_TIMEOUT, NiFiProperties.DEFAULT_ZOOKEEPER_SESSION_TIMEOUT);
-        connectionTimeoutMs = getTimePeriod(properties, NiFiProperties.ZOOKEEPER_CONNECT_TIMEOUT, NiFiProperties.DEFAULT_ZOOKEEPER_CONNECT_TIMEOUT);
-        rootPath = properties.getProperty(NiFiProperties.ZOOKEEPER_ROOT_NODE, NiFiProperties.DEFAULT_ZOOKEEPER_ROOT_NODE);
-
-        try {
-            PathUtils.validatePath(rootPath);
-        } catch (final IllegalArgumentException e) {
-            throw new IllegalStateException("The '" + NiFiProperties.ZOOKEEPER_ROOT_NODE + "' property in nifi.properties is set to an illegal value: " + rootPath);
-        }
+        zkConfig = ZooKeeperClientConfig.createConfig(properties);
     }
 
 
@@ -83,7 +65,7 @@ public class CuratorLeaderElectionManager implements LeaderElectionManager {
         stopped = false;
 
         final RetryPolicy retryPolicy = new RetryForever(5000);
-        curatorClient = CuratorFrameworkFactory.newClient(connectString, sessionTimeoutMs, connectionTimeoutMs, retryPolicy);
+        curatorClient = CuratorFrameworkFactory.newClient(zkConfig.getConnectString(), zkConfig.getSessionTimeoutMillis(), zkConfig.getConnectionTimeoutMillis(), retryPolicy);
         curatorClient.start();
 
         // Call #register for each already-registered role. This will
@@ -96,16 +78,6 @@ public class CuratorLeaderElectionManager implements LeaderElectionManager {
         logger.info("{} started", this);
     }
 
-    private int getTimePeriod(final NiFiProperties properties, final String propertyName, final String defaultValue) {
-        final String timeout = properties.getProperty(propertyName, defaultValue);
-        try {
-            return (int) FormatUtils.getTimeDuration(timeout, TimeUnit.MILLISECONDS);
-        } catch (final Exception e) {
-            logger.warn("Value of '" + propertyName + "' property is set to '" + timeout + "', which is not a valid time period. Using default of " + defaultValue);
-            return (int) FormatUtils.getTimeDuration(defaultValue, TimeUnit.MILLISECONDS);
-        }
-    }
-
 
     @Override
     public synchronized void register(final String roleName) {
@@ -122,6 +94,7 @@ public class CuratorLeaderElectionManager implements LeaderElectionManager {
             return;
         }
 
+        final String rootPath = zkConfig.getRootPath();
         final String leaderPath = (rootPath.endsWith("/") ? "" : "/") + "leaders/" + roleName;
 
         try {

http://git-wip-us.apache.org/repos/asf/nifi/blob/fb7b3fe4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
index 3e4d3a3..b80cc4b 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
@@ -120,8 +120,8 @@ public final class StandardProcessScheduler implements ProcessScheduler {
      *
      * @param task the task to perform
      */
-    public void submitFrameworkTask(final Runnable task) {
-        frameworkTaskExecutor.submit(task);
+    public Future<?> submitFrameworkTask(final Runnable task) {
+        return frameworkTaskExecutor.submit(task);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/nifi/blob/fb7b3fe4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/providers/zookeeper/ZooKeeperStateProvider.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/providers/zookeeper/ZooKeeperStateProvider.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/providers/zookeeper/ZooKeeperStateProvider.java
index 3a8a8e4..ba1aa30 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/providers/zookeeper/ZooKeeperStateProvider.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/providers/zookeeper/ZooKeeperStateProvider.java
@@ -22,9 +22,7 @@ import java.io.ByteArrayOutputStream;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
-import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -108,22 +106,6 @@ public class ZooKeeperStateProvider extends AbstractStateProvider {
         .defaultValue(OPEN_TO_WORLD.getValue())
         .required(true)
         .build();
-    static final PropertyDescriptor USERNAME = new PropertyDescriptor.Builder()
-        .name("Username")
-        .description("A Username that can be used to set Access Controls on ZooKeeper ZNodes. In order to apply any Access Controls to ZNodes, either a username and password must be set, "
-            + "or NiFi must be configured to communicate with ZooKeeper via Kerberos.")
-        .addValidator(Validator.VALID)
-        .required(false)
-        .build();
-    static final PropertyDescriptor PASSWORD = new PropertyDescriptor.Builder()
-        .name("Password")
-        .description("A password that can be used in conjunction with the Username property to set Access Controls on ZooKeeper ZNodes. "
-            + "In order to apply any Access Controls to ZNodes, either a username and password must be set, "
-            + "or NiFi must be configured to communicate with ZooKeeper via Kerberos.")
-        .addValidator(Validator.VALID)
-        .required(false)
-        .sensitive(true)
-        .build();
 
     private static final byte ENCODING_VERSION = 1;
 
@@ -148,45 +130,16 @@ public class ZooKeeperStateProvider extends AbstractStateProvider {
         properties.add(SESSION_TIMEOUT);
         properties.add(ROOT_NODE);
         properties.add(ACCESS_CONTROL);
-        properties.add(USERNAME);
-        properties.add(PASSWORD);
         return properties;
     }
 
 
     @Override
-    protected Collection<ValidationResult> customValidate(final ValidationContext validationContext) {
-        final List<ValidationResult> validationFailures = new ArrayList<>();
-
-        final String username = validationContext.getProperty(USERNAME).getValue();
-        if (username != null && !username.trim().isEmpty()) {
-            final String password = validationContext.getProperty(PASSWORD).getValue();
-            if (password == null || password.trim().isEmpty()) {
-                validationFailures.add(new ValidationResult.Builder()
-                    .input("")
-                    .subject("Username and Password")
-                    .valid(false)
-                    .explanation("If the Username is set, the Password must also be set")
-                    .build());
-            }
-        }
-        return validationFailures;
-    }
-
-    @Override
     public synchronized void init(final StateProviderInitializationContext context) {
         connectionString = context.getProperty(CONNECTION_STRING).getValue();
         rootNode = context.getProperty(ROOT_NODE).getValue();
         timeoutMillis = context.getProperty(SESSION_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue();
 
-        final String username = context.getProperty(USERNAME).getValue();
-        if (username == null) {
-            auth = null;
-        } else {
-            final String password = context.getProperty(PASSWORD).getValue();
-            auth = (username + ":" + password).getBytes(StandardCharsets.UTF_8);
-        }
-
         if (context.getProperty(ACCESS_CONTROL).getValue().equalsIgnoreCase(CREATOR_ONLY.getValue())) {
             acl = Ids.CREATOR_ALL_ACL;
         } else {
@@ -365,7 +318,7 @@ public class ZooKeeperStateProvider extends AbstractStateProvider {
                 keeper.setData(path, data, version);
             } catch (final NoNodeException nne) {
                 if (allowNodeCreation) {
-                    createNode(path, data, componentId, stateValues);
+                    createNode(path, data, componentId, stateValues, acl);
                     return;
                 } else {
                     throw nne;
@@ -396,7 +349,7 @@ public class ZooKeeperStateProvider extends AbstractStateProvider {
     }
 
 
-    private void createNode(final String path, final byte[] data, final String componentId, final Map<String, String> stateValues) throws IOException, KeeperException {
+    private void createNode(final String path, final byte[] data, final String componentId, final Map<String, String> stateValues, final List<ACL> acls) throws IOException, KeeperException {
         try {
             if (data != null && data.length > ONE_MB) {
                 throw new StateTooLargeException("Failed to set cluster-wide state in ZooKeeper for component with ID " + componentId
@@ -404,20 +357,20 @@ public class ZooKeeperStateProvider extends AbstractStateProvider {
                     + " bytes, and the maximum allowed by ZooKeeper is 1 MB (" + ONE_MB + " bytes)");
             }
 
-            getZooKeeper().create(path, data, acl, CreateMode.PERSISTENT);
+            getZooKeeper().create(path, data, acls, CreateMode.PERSISTENT);
         } catch (final InterruptedException ie) {
             throw new IOException("Failed to update cluster-wide state due to interruption", ie);
         } catch (final KeeperException ke) {
             final Code exceptionCode = ke.code();
             if (Code.NONODE == exceptionCode) {
                 final String parentPath = StringUtils.substringBeforeLast(path, "/");
-                createNode(parentPath, null, componentId, stateValues);
-                createNode(path, data, componentId, stateValues);
+                createNode(parentPath, null, componentId, stateValues, Ids.OPEN_ACL_UNSAFE);
+                createNode(path, data, componentId, stateValues, acls);
                 return;
             }
             if (Code.SESSIONEXPIRED == exceptionCode) {
                 invalidateClient();
-                createNode(path, data, componentId, stateValues);
+                createNode(path, data, componentId, stateValues, acls);
                 return;
             }
 
@@ -429,7 +382,7 @@ public class ZooKeeperStateProvider extends AbstractStateProvider {
                 } catch (final KeeperException ke1) {
                     // Node no longer exists -- it was removed by someone else. Go recreate the node.
                     if (ke1.code() == Code.NONODE) {
-                        createNode(path, data, componentId, stateValues);
+                        createNode(path, data, componentId, stateValues, acls);
                         return;
                     }
                 } catch (final InterruptedException ie) {

http://git-wip-us.apache.org/repos/asf/nifi/blob/fb7b3fe4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/server/ZooKeeperStateServer.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/server/ZooKeeperStateServer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/server/ZooKeeperStateServer.java
index 0fc9ec2..4c4b418 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/server/ZooKeeperStateServer.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/server/ZooKeeperStateServer.java
@@ -54,11 +54,17 @@ public class ZooKeeperStateServer extends ZooKeeperServerMain {
     }
 
     public synchronized void start() throws IOException {
+        if (started) {
+            return;
+        }
+
         if (quorumPeerConfig.isDistributed()) {
             startDistributed();
         } else {
             startStandalone();
         }
+
+        started = true;
     }
 
     private void startStandalone() throws IOException {
@@ -67,8 +73,6 @@ public class ZooKeeperStateServer extends ZooKeeperServerMain {
         final ServerConfig config = new ServerConfig();
         config.readFrom(quorumPeerConfig);
         try {
-            started = true;
-
             transactionLog = new FileTxnSnapLog(new File(config.getDataLogDir()), new File(config.getDataDir()));
 
             embeddedZkServer = new ZooKeeperServer();
@@ -94,8 +98,6 @@ public class ZooKeeperStateServer extends ZooKeeperServerMain {
         logger.info("Starting Embedded ZooKeeper Peer");
 
         try {
-            started = true;
-
             transactionLog = new FileTxnSnapLog(new File(quorumPeerConfig.getDataLogDir()), new File(quorumPeerConfig.getDataDir()));
 
             connectionFactory = ServerCnxnFactory.createFactory();

http://git-wip-us.apache.org/repos/asf/nifi/blob/fb7b3fe4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/state/providers/zookeeper/TestZooKeeperStateProvider.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/state/providers/zookeeper/TestZooKeeperStateProvider.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/state/providers/zookeeper/TestZooKeeperStateProvider.java
index ef72d6c..f8015d2 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/state/providers/zookeeper/TestZooKeeperStateProvider.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/state/providers/zookeeper/TestZooKeeperStateProvider.java
@@ -17,13 +17,8 @@
 
 package org.apache.nifi.controller.state.providers.zookeeper;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-
 import java.io.IOException;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 
 import javax.net.ssl.SSLContext;
@@ -36,11 +31,6 @@ import org.apache.nifi.components.state.StateProvider;
 import org.apache.nifi.components.state.StateProviderInitializationContext;
 import org.apache.nifi.components.state.exception.StateTooLargeException;
 import org.apache.nifi.controller.state.providers.AbstractTestStateProvider;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.KeeperException.Code;
-import org.apache.zookeeper.ZooDefs.Perms;
-import org.apache.zookeeper.data.ACL;
-import org.apache.zookeeper.data.Stat;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -126,83 +116,6 @@ public class TestZooKeeperStateProvider extends AbstractTestStateProvider {
         return provider;
     }
 
-    @Test
-    public void testWithUsernameAndPasswordCreatorOnly() throws Exception {
-        final Map<PropertyDescriptor, String> properties = new HashMap<>(defaultProperties);
-        properties.put(ZooKeeperStateProvider.CONNECTION_STRING, zkServer.getConnectString());
-        properties.put(ZooKeeperStateProvider.USERNAME, "nifi");
-        properties.put(ZooKeeperStateProvider.PASSWORD, "nifi");
-        properties.put(ZooKeeperStateProvider.ACCESS_CONTROL, ZooKeeperStateProvider.CREATOR_ONLY.getValue());
-
-        final ZooKeeperStateProvider authorizedProvider = createProvider(properties);
-
-        try {
-            final Map<String, String> state = new HashMap<>();
-            state.put("testWithUsernameAndPasswordCreatorOnly", "my value");
-            authorizedProvider.setState(state, componentId);
-
-            final List<ACL> acls = authorizedProvider.getZooKeeper().getACL(properties.get(ZooKeeperStateProvider.ROOT_NODE) + "/components/" + componentId, new Stat());
-            assertNotNull(acls);
-            assertEquals(1, acls.size());
-            final ACL acl = acls.get(0);
-            assertEquals(Perms.ALL, acl.getPerms());
-            // ID is our username:<SHA1 hash>
-            assertEquals("nifi:RuSeH3tpzgba3p9WrG/UpiSIsGg=", acl.getId().getId());
-
-            final Map<String, String> stateValues = authorizedProvider.getState(componentId).toMap();
-            assertEquals(state, stateValues);
-
-            // ensure that our default provider cannot access the data, since it has not authenticated
-            try {
-                this.provider.getState(componentId);
-                Assert.fail("Expected an IOException but it wasn't thrown");
-            } catch (final IOException ioe) {
-                final Throwable cause = ioe.getCause();
-                assertTrue(cause instanceof KeeperException);
-                final KeeperException ke = (KeeperException) cause;
-                assertEquals(Code.NOAUTH, ke.code());
-            }
-        } finally {
-            authorizedProvider.onComponentRemoved(componentId);
-            authorizedProvider.disable();
-            authorizedProvider.shutdown();
-        }
-    }
-
-    @Test
-    public void testWithUsernameAndPasswordOpen() throws Exception {
-        final Map<PropertyDescriptor, String> properties = new HashMap<>(defaultProperties);
-        properties.put(ZooKeeperStateProvider.CONNECTION_STRING, zkServer.getConnectString());
-        properties.put(ZooKeeperStateProvider.USERNAME, "nifi");
-        properties.put(ZooKeeperStateProvider.PASSWORD, "nifi");
-        properties.put(ZooKeeperStateProvider.ACCESS_CONTROL, ZooKeeperStateProvider.OPEN_TO_WORLD.getValue());
-
-        final ZooKeeperStateProvider authorizedProvider = createProvider(properties);
-
-        try {
-            final Map<String, String> state = new HashMap<>();
-            state.put("testWithUsernameAndPasswordOpen", "my value");
-            authorizedProvider.setState(state, componentId);
-
-            final List<ACL> acls = authorizedProvider.getZooKeeper().getACL(properties.get(ZooKeeperStateProvider.ROOT_NODE) + "/components/" + componentId, new Stat());
-            assertNotNull(acls);
-            assertEquals(1, acls.size());
-            final ACL acl = acls.get(0);
-            assertEquals(Perms.ALL, acl.getPerms());
-            assertEquals("anyone", acl.getId().getId());
-
-            final Map<String, String> stateValues = authorizedProvider.getState(componentId).toMap();
-            assertEquals(state, stateValues);
-
-            // ensure that our default provider can also access the data, since it has not authenticated
-            final Map<String, String> unauthStateValues = this.provider.getState(componentId).toMap();
-            assertEquals(state, unauthStateValues);
-        } finally {
-            authorizedProvider.onComponentRemoved(componentId);
-            authorizedProvider.disable();
-            authorizedProvider.shutdown();
-        }
-    }
 
     @Test
     public void testStateTooLargeExceptionThrown() {

http://git-wip-us.apache.org/repos/asf/nifi/blob/fb7b3fe4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/logback.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/logback.xml b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/logback.xml
index 36f42d7..e310974 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/logback.xml
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/logback.xml
@@ -86,8 +86,15 @@
     <!-- valid logging levels: TRACE, DEBUG, INFO, WARN, ERROR -->
     
     <logger name="org.apache.nifi" level="INFO"/>
+    <logger name="org.apache.nifi.processors" level="WARN"/>
     <logger name="org.apache.nifi.controller.repository.StandardProcessSession" level="WARN" />
     
+    
+    <logger name="org.apache.zookeeper.ClientCnxn" level="ERROR" />
+    <logger name="org.apache.zookeeper.server.NIOServerCnxn" level="ERROR" />
+    <logger name="org.apache.zookeeper.server.NIOServerCnxnFactory" level="ERROR" />
+    <logger name="org.apache.zookeeper.ZooKeeper" level="ERROR" />
+    
     <!-- Logger for managing logging statements for nifi clusters. -->
     <logger name="org.apache.nifi.cluster" level="INFO"/>
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/fb7b3fe4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties
index f7912a1..cf9c433 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties
@@ -169,6 +169,7 @@ nifi.zookeeper.connect.string=${nifi.zookeeper.connect.string}
 nifi.zookeeper.connect.timeout=${nifi.zookeeper.connect.timeout}
 nifi.zookeeper.session.timeout=${nifi.zookeeper.session.timeout}
 nifi.zookeeper.root.node=${nifi.zookeeper.root.node}
+nifi.zookeeper.access.control=${nifi.zookeeper.access.control}
 
 # cluster manager properties (only configure for cluster manager) #
 nifi.cluster.is.manager=${nifi.cluster.is.manager}

http://git-wip-us.apache.org/repos/asf/nifi/blob/fb7b3fe4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/state-management.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/state-management.xml b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/state-management.xml
index 665b22b..d4a13cf 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/state-management.xml
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/state-management.xml
@@ -46,23 +46,13 @@
                             - Open  : ZNodes will be open to any ZooKeeper client.
                             - CreatorOnly  : ZNodes will be accessible only by the creator. The creator will have full access to create children, read, write, delete, and administer the ZNodes.
                                              This option is available only if access to ZooKeeper is secured via Kerberos or if a Username and Password are set.
-
-        Username - An optional username that can be used to assign Access Controls to ZNodes. ZooKeeper allows users to assign arbitrary usernames and passwords to ZNodes. These usernames
-                   and passwords are not explicitly defined elsewhere but are simply associated with ZNodes, so it is important that all NiFi nodes in a cluster have the same value for the
-                   Username and Password properties.
-
-        Password - An optional password that can be used to assign Access Controls to ZNodes. This property must be set if the Username property is set. NOTE: ZooKeeper transmits passwords
-                   in plain text. As a result, a Username and Password should be used only if communicate with a ZooKeeper on a localhost or over encrypted comms (such as configuring SSL
-                   communications with ZooKeeper).
     -->
     <cluster-provider>
         <id>zk-provider</id>
         <class>org.apache.nifi.controller.state.providers.zookeeper.ZooKeeperStateProvider</class>
         <property name="Connect String"></property>
         <property name="Root Node">/nifi</property>
-        <property name="Session Timeout">30 seconds</property>
-        <property name="Access Control">CreatorOnly</property>
-        <property name="Username">nifi</property>
-        <property name="Password">nifi</property>
+        <property name="Session Timeout">10 seconds</property>
+        <property name="Access Control">Open</property>
     </cluster-provider>
 </stateManagement>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/fb7b3fe4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
index 0969eac..dae1ab6 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
@@ -27,6 +27,7 @@ import org.apache.nifi.admin.service.KeyService;
 import org.apache.nifi.authorization.Resource;
 import org.apache.nifi.cluster.context.ClusterContext;
 import org.apache.nifi.cluster.context.ClusterContextThreadLocal;
+import org.apache.nifi.cluster.coordination.heartbeat.NodeHeartbeat;
 import org.apache.nifi.cluster.manager.exception.UnknownNodeException;
 import org.apache.nifi.cluster.manager.impl.WebClusterManager;
 import org.apache.nifi.cluster.node.Node;
@@ -549,7 +550,17 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
         }
 
         final String nodeId = nodeDTO.getNodeId();
-        return dtoFactory.createNodeDTO(clusterManager.getNode(nodeId), clusterManager.getNodeEvents(nodeId), isPrimaryNode(nodeId));
+
+        NodeIdentifier nodeIdentifier = null;
+        for (final Node node : clusterManager.getNodes()) {
+            if (node.getNodeId().getId().equals(nodeId)) {
+                nodeIdentifier = node.getNodeId();
+                break;
+            }
+        }
+
+        final NodeHeartbeat nodeHeartbeat = nodeIdentifier == null ? null : clusterManager.getLatestHeartbeat(nodeIdentifier);
+        return dtoFactory.createNodeDTO(clusterManager.getNode(nodeId), nodeHeartbeat, clusterManager.getNodeEvents(nodeId), isPrimaryNode(nodeId));
     }
 
     @Override
@@ -1923,7 +1934,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
         for (final Node node : clusterManager.getNodes()) {
             // create and add node dto
             final String nodeId = node.getNodeId().getId();
-            nodeDtos.add(dtoFactory.createNodeDTO(node, clusterManager.getNodeEvents(nodeId), isPrimaryNode(nodeId)));
+            nodeDtos.add(dtoFactory.createNodeDTO(node, clusterManager.getLatestHeartbeat(node.getNodeId()), clusterManager.getNodeEvents(nodeId), isPrimaryNode(nodeId)));
         }
 
         return clusterDto;
@@ -1935,7 +1946,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
         if (node == null) {
             throw new UnknownNodeException("Node does not exist.");
         } else {
-            return dtoFactory.createNodeDTO(node, clusterManager.getNodeEvents(nodeId), isPrimaryNode(nodeId));
+            return dtoFactory.createNodeDTO(node, clusterManager.getLatestHeartbeat(node.getNodeId()), clusterManager.getNodeEvents(nodeId), isPrimaryNode(nodeId));
         }
     }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/fb7b3fe4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ControllerServiceResource.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ControllerServiceResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ControllerServiceResource.java
index 34299ac..849d9f2 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ControllerServiceResource.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ControllerServiceResource.java
@@ -620,7 +620,7 @@ public class ControllerServiceResource extends ApplicationResource {
 
         // replicate if cluster manager
         if (properties.isClusterManager() && Availability.NODE.equals(avail)) {
-            return clusterManager.applyRequest(HttpMethod.PUT, getAbsolutePath(), getRequestParameters(true), getHeaders()).getResponse();
+            return clusterManager.applyRequest(HttpMethod.PUT, getAbsolutePath(), updateClientId(updateReferenceRequest), getHeaders()).getResponse();
         }
 
         // handle expects request (usually from the cluster manager)

http://git-wip-us.apache.org/repos/asf/nifi/blob/fb7b3fe4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
index 90acfb8..79e1d55 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
@@ -35,7 +35,7 @@ import org.apache.nifi.annotation.behavior.Stateful;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.authorization.Resource;
-import org.apache.nifi.cluster.HeartbeatPayload;
+import org.apache.nifi.cluster.coordination.heartbeat.NodeHeartbeat;
 import org.apache.nifi.cluster.event.Event;
 import org.apache.nifi.cluster.manager.StatusMerger;
 import org.apache.nifi.cluster.node.Node;
@@ -2545,8 +2545,8 @@ public final class DtoFactory {
         return revisionDTO;
     }
 
-    public NodeDTO createNodeDTO(Node node, List<Event> events, boolean primary) {
 
+    public NodeDTO createNodeDTO(Node node, NodeHeartbeat nodeHeartbeat, List<Event> events, boolean primary) {
         final NodeDTO nodeDto = new NodeDTO();
 
         // populate node dto
@@ -2560,16 +2560,12 @@ public final class DtoFactory {
         nodeDto.setConnectionRequested(connectionRequested);
 
         // only connected nodes have heartbeats
-        if (node.getHeartbeat() != null) {
-            final Date heartbeat = new Date(node.getHeartbeat().getCreatedTimestamp());
+        if (nodeHeartbeat != null) {
+            final Date heartbeat = new Date(nodeHeartbeat.getTimestamp());
             nodeDto.setHeartbeat(heartbeat);
-        }
-
-        final HeartbeatPayload nodeHeartbeatPayload = node.getHeartbeatPayload();
-        if (nodeHeartbeatPayload != null) {
-            nodeDto.setNodeStartTime(new Date(nodeHeartbeatPayload.getSystemStartTime()));
-            nodeDto.setActiveThreadCount(nodeHeartbeatPayload.getActiveThreadCount());
-            nodeDto.setQueued(FormatUtils.formatCount(nodeHeartbeatPayload.getTotalFlowFileCount()) + " / " + FormatUtils.formatDataSize(nodeHeartbeatPayload.getTotalFlowFileBytes()));
+            nodeDto.setNodeStartTime(new Date(nodeHeartbeat.getSystemStartTime()));
+            nodeDto.setActiveThreadCount(nodeHeartbeat.getActiveThreadCount());
+            nodeDto.setQueued(FormatUtils.formatCount(nodeHeartbeat.getFlowFileCount()) + " / " + FormatUtils.formatDataSize(nodeHeartbeat.getFlowFileBytes()));
         }
 
         // populate node events


[4/4] nifi git commit: NIFI-1678: Started refactoring heartbeating mechanism, using a new package: org.apache.nifi.cluster.coordination

Posted by mc...@apache.org.
NIFI-1678: Started refactoring heartbeating mechanism, using a new package: org.apache.nifi.cluster.coordination

Added configuration options for ZooKeeper username & password for heartbeat management. Also addressed issue where nodes that were previously disconnected were asked to disconnect upon restart

Ensure that ACL is set properly when creating heartbeat node. Removed unused ControllerStartupFailureMessage.java

Changed ZooKeeper ACL's so that container nodes that would not be sensitive are wide open and removed the usage of username & password when communicating with ZooKeeper. This was done specifically because username/password combination is considered a 'testing' feature that should not be used in production and is not supported by Apache Curator

Refactored CuratorHeartbeatMonitor into an abstract heartbeat monitor that is responsible for processing heartbeats and CuratorHeartbeatMonitor that is responsible for retrieving heartbeat information

Refactored so that heartbeats are sent to Cluster Coordinator directly instead of to ZooKeeper. ZooKeeper is used to know which node is the cluster coordinator but heartbeats to the Cluster Coordinator provide additional information about the nodes.

Code cleanup and incorporate comments from peer review

This closes #323


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/fb7b3fe4
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/fb7b3fe4
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/fb7b3fe4

Branch: refs/heads/master
Commit: fb7b3fe4b87b8c9f45a43d460a3020947ef55dcc
Parents: 9ea2275
Author: Mark Payne <ma...@hotmail.com>
Authored: Thu Mar 24 11:49:08 2016 -0400
Committer: Matt Gilman <ma...@gmail.com>
Committed: Fri Apr 22 15:01:04 2016 -0400

----------------------------------------------------------------------
 nifi-assembly/pom.xml                           |   1 +
 .../org/apache/nifi/util/NiFiProperties.java    |   1 +
 .../coordination/node/DisconnectionCode.java    |  75 ++++
 .../coordination/node/NodeConnectionState.java  |  50 +++
 .../coordination/node/NodeConnectionStatus.java |  94 ++++
 .../protocol/ClusterManagerProtocolSender.java  |  11 +
 .../cluster/protocol/ConnectionResponse.java    |  10 +-
 .../apache/nifi/cluster/protocol/Heartbeat.java |  12 +-
 .../nifi/cluster/protocol/NodeIdentifier.java   |  25 +-
 .../cluster/protocol/NodeProtocolSender.java    |  35 +-
 .../impl/ClusterManagerProtocolSenderImpl.java  |  44 ++
 .../ClusterManagerProtocolSenderListener.java   |   8 +
 .../protocol/impl/NodeProtocolSenderImpl.java   |  49 ++-
 .../impl/NodeProtocolSenderListener.java        |  20 +-
 .../jaxb/message/AdaptedConnectionResponse.java |   9 -
 .../protocol/jaxb/message/AdaptedHeartbeat.java |  12 +-
 .../message/AdaptedNodeConnectionStatus.java    |  60 +++
 .../jaxb/message/ConnectionResponseAdapter.java |   3 +-
 .../protocol/jaxb/message/HeartbeatAdapter.java |   5 +-
 .../message/NodeConnectionStatusAdapter.java    |  40 ++
 .../protocol/jaxb/message/ObjectFactory.java    |  11 +-
 .../ControllerStartupFailureMessage.java        |  49 ---
 .../protocol/message/HeartbeatMessage.java      |  12 +-
 .../message/NodeStatusChangeMessage.java        |  62 +++
 .../protocol/message/ProtocolMessage.java       |   3 +-
 .../message/ReconnectionFailureMessage.java     |  47 --
 .../impl/NodeProtocolSenderImplTest.java        |  31 +-
 .../nifi-framework-cluster/pom.xml              |  13 +
 .../coordination/ClusterCoordinator.java        | 127 ++++++
 .../heartbeat/AbstractHeartbeatMonitor.java     | 283 ++++++++++++
 .../ClusterProtocolHeartbeatMonitor.java        | 241 ++++++++++
 .../heartbeat/HeartbeatMonitor.java             |  55 +++
 .../coordination/heartbeat/NodeHeartbeat.java   |  64 +++
 .../heartbeat/StandardNodeHeartbeat.java        |  98 +++++
 .../cluster/coordination/node/ClusterNode.java  |  39 ++
 .../nifi/cluster/manager/ClusterManager.java    |   9 -
 .../cluster/manager/impl/WebClusterManager.java | 437 ++-----------------
 .../impl/WebClusterManagerCoordinator.java      | 246 +++++++++++
 .../java/org/apache/nifi/cluster/node/Node.java |  66 +--
 .../heartbeat/TestAbstractHeartbeatMonitor.java | 350 +++++++++++++++
 .../nifi/cluster/ConnectionException.java       |   4 +
 .../apache/nifi/controller/FlowController.java  | 303 ++++++++-----
 .../FlowSynchronizationException.java           |   4 +-
 .../nifi/controller/StandardFlowService.java    |  92 ++--
 .../controller/UninheritableFlowException.java  |   4 +-
 .../nifi/controller/cluster/Heartbeater.java    |  41 ++
 .../cluster/ZooKeeperClientConfig.java          | 117 +++++
 .../cluster/ZooKeeperHeartbeater.java           | 117 +++++
 .../election/CuratorLeaderElectionManager.java  |  37 +-
 .../scheduling/StandardProcessScheduler.java    |   4 +-
 .../zookeeper/ZooKeeperStateProvider.java       |  61 +--
 .../state/server/ZooKeeperStateServer.java      |  10 +-
 .../zookeeper/TestZooKeeperStateProvider.java   |  87 ----
 .../src/main/resources/conf/logback.xml         |   7 +
 .../src/main/resources/conf/nifi.properties     |   1 +
 .../main/resources/conf/state-management.xml    |  14 +-
 .../nifi/web/StandardNiFiServiceFacade.java     |  17 +-
 .../nifi/web/api/ControllerServiceResource.java |   2 +-
 .../org/apache/nifi/web/api/dto/DtoFactory.java |  18 +-
 59 files changed, 2688 insertions(+), 1059 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/fb7b3fe4/nifi-assembly/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-assembly/pom.xml b/nifi-assembly/pom.xml
index e85c83f..7f92a2a 100644
--- a/nifi-assembly/pom.xml
+++ b/nifi-assembly/pom.xml
@@ -457,6 +457,7 @@ language governing permissions and limitations under the License. -->
         <nifi.zookeeper.connect.timeout>3 secs</nifi.zookeeper.connect.timeout>
         <nifi.zookeeper.session.timeout>3 secs</nifi.zookeeper.session.timeout>
         <nifi.zookeeper.root.node>/nifi</nifi.zookeeper.root.node>
+        <nifi.zookeeper.access.control>Open</nifi.zookeeper.access.control>
 
         <!-- nifi.properties: kerberos properties -->
         <nifi.kerberos.krb5.file> </nifi.kerberos.krb5.file>

http://git-wip-us.apache.org/repos/asf/nifi/blob/fb7b3fe4/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java b/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java
index 63693bf..a6387ad 100644
--- a/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java
+++ b/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java
@@ -172,6 +172,7 @@ public class NiFiProperties extends Properties {
     public static final String ZOOKEEPER_CONNECT_TIMEOUT = "nifi.zookeeper.connect.timeout";
     public static final String ZOOKEEPER_SESSION_TIMEOUT = "nifi.zookeeper.session.timeout";
     public static final String ZOOKEEPER_ROOT_NODE = "nifi.zookeeper.root.node";
+    public static final String ZOOKEEPER_ACCESS_CONTROL = "nifi.zookeeper.access.control";
 
     // cluster manager properties
     public static final String CLUSTER_IS_MANAGER = "nifi.cluster.is.manager";

http://git-wip-us.apache.org/repos/asf/nifi/blob/fb7b3fe4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/node/DisconnectionCode.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/node/DisconnectionCode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/node/DisconnectionCode.java
new file mode 100644
index 0000000..bd20c3f
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/node/DisconnectionCode.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.cluster.coordination.node;
+
+/**
+ * An enumeration of the reasons that a node may be disconnected
+ * from the cluster
+ */
+public enum DisconnectionCode {
+    /**
+     * The node was disconnected for an unreported reason
+     */
+    UNKNOWN("Unknown Reason"),
+
+    /**
+     * The node has not yet connected to the cluster
+     */
+    NOT_YET_CONNECTED("Not Has Not Yet Connected to Cluster"),
+
+    /**
+     * A user explicitly disconnected the node from the cluster
+     */
+    USER_DISCONNECTED("User Disconnected Node"),
+
+    /**
+     * The node was disconnected because it stopped heartbeating
+     */
+    LACK_OF_HEARTBEAT("Lack of Heartbeat"),
+
+    /**
+     * The firewall prevented the node from joining the cluster
+     */
+    BLOCKED_BY_FIREWALL("Blocked by Firewall"),
+
+    /**
+     * The node failed to startup properly
+     */
+    STARTUP_FAILURE("Node Failed to Startup Properly"),
+
+    /**
+     * The node's flow did not match the cluster's flow
+     */
+    MISMATCHED_FLOWS("Node's Flow did not Match Cluster Flow"),
+
+    /**
+     * Node is being shut down
+     */
+    NODE_SHUTDOWN("Node was Shutdown");
+
+    private final String description;
+
+    private DisconnectionCode(final String description) {
+        this.description = description;
+    }
+
+    @Override
+    public String toString() {
+        return description;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/fb7b3fe4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/node/NodeConnectionState.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/node/NodeConnectionState.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/node/NodeConnectionState.java
new file mode 100644
index 0000000..9627b2d
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/node/NodeConnectionState.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.cluster.coordination.node;
+
+import javax.xml.bind.annotation.XmlEnum;
+
+@XmlEnum(String.class)
+public enum NodeConnectionState {
+    /**
+     * A node has issued a connection request to the cluster, but has not yet
+     * sent a heartbeat. A connecting node can transition to DISCONNECTED or CONNECTED. The cluster
+     * will not accept any external requests to change the flow while any node is in
+     * this state.
+     */
+    CONNECTING,
+
+    /**
+     * A node that is connected to the cluster. A connecting node transitions
+     * to connected after the cluster receives the node's first heartbeat. A
+     * connected node can transition to disconnecting.
+     */
+    CONNECTED,
+
+    /**
+     * A node that is in the process of disconnecting from the cluster.
+     * A DISCONNECTING node will always transition to DISCONNECTED.
+     */
+    DISCONNECTING,
+
+    /**
+     * A node that is not connected to the cluster.
+     * A DISCONNECTED node can transition to CONNECTING.
+     */
+    DISCONNECTED
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/fb7b3fe4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/node/NodeConnectionStatus.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/node/NodeConnectionStatus.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/node/NodeConnectionStatus.java
new file mode 100644
index 0000000..cd2a6a7
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/node/NodeConnectionStatus.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.cluster.coordination.node;
+
+import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter;
+
+import org.apache.nifi.cluster.protocol.jaxb.message.NodeConnectionStatusAdapter;
+
+/**
+ * Describes the current status of a node
+ */
+@XmlJavaTypeAdapter(NodeConnectionStatusAdapter.class)
+public class NodeConnectionStatus {
+    private final NodeConnectionState state;
+    private final DisconnectionCode disconnectCode;
+    private final String disconnectReason;
+    private final Long connectionRequestTime;
+
+    public NodeConnectionStatus(final NodeConnectionState state) {
+        this(state, null, null, null);
+    }
+
+    public NodeConnectionStatus(final NodeConnectionState state, final long connectionRequestTime) {
+        this(state, null, null, connectionRequestTime);
+    }
+
+    public NodeConnectionStatus(final DisconnectionCode disconnectionCode) {
+        this(NodeConnectionState.DISCONNECTED, disconnectionCode, disconnectionCode.name(), null);
+    }
+
+    public NodeConnectionStatus(final DisconnectionCode disconnectionCode, final String disconnectionExplanation) {
+        this(NodeConnectionState.DISCONNECTED, disconnectionCode, disconnectionExplanation, null);
+    }
+
+    public NodeConnectionStatus(final NodeConnectionState state, final DisconnectionCode disconnectionCode) {
+        this(state, disconnectionCode, disconnectionCode.name(), null);
+    }
+
+    public NodeConnectionStatus(final NodeConnectionState state, final DisconnectionCode disconnectCode, final String disconnectReason, final Long connectionRequestTime) {
+        this.state = state;
+        if (state == NodeConnectionState.DISCONNECTED && disconnectCode == null) {
+            this.disconnectCode = DisconnectionCode.UNKNOWN;
+            this.disconnectReason = this.disconnectCode.toString();
+        } else {
+            this.disconnectCode = disconnectCode;
+            this.disconnectReason = disconnectReason;
+        }
+
+        this.connectionRequestTime = connectionRequestTime;
+    }
+
+    public NodeConnectionState getState() {
+        return state;
+    }
+
+    public DisconnectionCode getDisconnectCode() {
+        return disconnectCode;
+    }
+
+    public String getDisconnectReason() {
+        return disconnectReason;
+    }
+
+    public Long getConnectionRequestTime() {
+        return connectionRequestTime;
+    }
+
+    @Override
+    public String toString() {
+        final StringBuilder sb = new StringBuilder();
+        final NodeConnectionState state = getState();
+        sb.append("NodeConnectionStatus[state=").append(state);
+        if (state == NodeConnectionState.DISCONNECTED || state == NodeConnectionState.DISCONNECTING) {
+            sb.append(", Disconnect Code=").append(getDisconnectCode()).append(", Disconnect Reason=").append(getDisconnectReason());
+        }
+        sb.append("]");
+        return sb.toString();
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/fb7b3fe4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ClusterManagerProtocolSender.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ClusterManagerProtocolSender.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ClusterManagerProtocolSender.java
index bdefbbf..010aed7 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ClusterManagerProtocolSender.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ClusterManagerProtocolSender.java
@@ -16,9 +16,12 @@
  */
 package org.apache.nifi.cluster.protocol;
 
+import java.util.Set;
+
 import org.apache.nifi.cluster.protocol.message.DisconnectMessage;
 import org.apache.nifi.cluster.protocol.message.FlowRequestMessage;
 import org.apache.nifi.cluster.protocol.message.FlowResponseMessage;
+import org.apache.nifi.cluster.protocol.message.NodeStatusChangeMessage;
 import org.apache.nifi.cluster.protocol.message.ReconnectionRequestMessage;
 import org.apache.nifi.cluster.protocol.message.ReconnectionResponseMessage;
 import org.apache.nifi.reporting.BulletinRepository;
@@ -61,4 +64,12 @@ public interface ClusterManagerProtocolSender {
      * @param bulletinRepository repo
      */
     void setBulletinRepository(final BulletinRepository bulletinRepository);
+
+    /**
+     * Notifies all nodes in the given set that a node in the cluster has a new status
+     *
+     * @param nodesToNotify the nodes that should be notified of the change
+     * @param msg the message that indicates which node's status changed and what it changed to
+     */
+    void notifyNodeStatusChange(Set<NodeIdentifier> nodesToNotify, NodeStatusChangeMessage msg);
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/fb7b3fe4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ConnectionResponse.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ConnectionResponse.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ConnectionResponse.java
index 96bde72..e6d8cf6 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ConnectionResponse.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ConnectionResponse.java
@@ -35,14 +35,13 @@ public class ConnectionResponse {
     private final int tryLaterSeconds;
     private final NodeIdentifier nodeIdentifier;
     private final StandardDataFlow dataFlow;
-    private final boolean primary;
     private final Integer managerRemoteInputPort;
     private final Boolean managerRemoteCommsSecure;
     private final String instanceId;
 
     private volatile String clusterManagerDN;
 
-    public ConnectionResponse(final NodeIdentifier nodeIdentifier, final StandardDataFlow dataFlow, final boolean primary,
+    public ConnectionResponse(final NodeIdentifier nodeIdentifier, final StandardDataFlow dataFlow,
         final Integer managerRemoteInputPort, final Boolean managerRemoteCommsSecure, final String instanceId) {
         if (nodeIdentifier == null) {
             throw new IllegalArgumentException("Node identifier may not be empty or null.");
@@ -53,7 +52,6 @@ public class ConnectionResponse {
         this.dataFlow = dataFlow;
         this.tryLaterSeconds = 0;
         this.rejectionReason = null;
-        this.primary = primary;
         this.managerRemoteInputPort = managerRemoteInputPort;
         this.managerRemoteCommsSecure = managerRemoteCommsSecure;
         this.instanceId = instanceId;
@@ -67,7 +65,6 @@ public class ConnectionResponse {
         this.nodeIdentifier = null;
         this.tryLaterSeconds = tryLaterSeconds;
         this.rejectionReason = null;
-        this.primary = false;
         this.managerRemoteInputPort = null;
         this.managerRemoteCommsSecure = null;
         this.instanceId = null;
@@ -78,7 +75,6 @@ public class ConnectionResponse {
         this.nodeIdentifier = null;
         this.tryLaterSeconds = 0;
         this.rejectionReason = rejectionReason;
-        this.primary = false;
         this.managerRemoteInputPort = null;
         this.managerRemoteCommsSecure = null;
         this.instanceId = null;
@@ -96,10 +92,6 @@ public class ConnectionResponse {
         return new ConnectionResponse(explanation);
     }
 
-    public boolean isPrimary() {
-        return primary;
-    }
-
     public boolean shouldTryLater() {
         return tryLaterSeconds > 0;
     }

http://git-wip-us.apache.org/repos/asf/nifi/blob/fb7b3fe4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/Heartbeat.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/Heartbeat.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/Heartbeat.java
index 80a4ba7..f2b0fde 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/Heartbeat.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/Heartbeat.java
@@ -19,6 +19,8 @@ package org.apache.nifi.cluster.protocol;
 import java.util.Date;
 import javax.xml.bind.annotation.XmlTransient;
 import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter;
+
+import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus;
 import org.apache.nifi.cluster.protocol.jaxb.message.HeartbeatAdapter;
 
 /**
@@ -30,17 +32,17 @@ public class Heartbeat {
 
     private final NodeIdentifier nodeIdentifier;
     private final boolean primary;
-    private final boolean connected;
+    private final NodeConnectionStatus connectionStatus;
     private final long createdTimestamp;
     private final byte[] payload;
 
-    public Heartbeat(final NodeIdentifier nodeIdentifier, final boolean primary, final boolean connected, final byte[] payload) {
+    public Heartbeat(final NodeIdentifier nodeIdentifier, final boolean primary, final NodeConnectionStatus connectionStatus, final byte[] payload) {
         if (nodeIdentifier == null) {
             throw new IllegalArgumentException("Node Identifier may not be null.");
         }
         this.nodeIdentifier = nodeIdentifier;
         this.primary = primary;
-        this.connected = connected;
+        this.connectionStatus = connectionStatus;
         this.payload = payload;
         this.createdTimestamp = new Date().getTime();
     }
@@ -57,8 +59,8 @@ public class Heartbeat {
         return primary;
     }
 
-    public boolean isConnected() {
-        return connected;
+    public NodeConnectionStatus getConnectionStatus() {
+        return connectionStatus;
     }
 
     @XmlTransient

http://git-wip-us.apache.org/repos/asf/nifi/blob/fb7b3fe4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/NodeIdentifier.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/NodeIdentifier.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/NodeIdentifier.java
index cb4dd6a..568fc20 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/NodeIdentifier.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/NodeIdentifier.java
@@ -16,6 +16,10 @@
  */
 package org.apache.nifi.cluster.protocol;
 
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlRootElement;
+
 import org.apache.commons.lang3.StringUtils;
 
 /**
@@ -32,6 +36,8 @@ import org.apache.commons.lang3.StringUtils;
  * @Immutable
  * @Threadsafe
  */
+@XmlRootElement
+@XmlAccessorType(XmlAccessType.FIELD)
 public class NodeIdentifier {
     /**
      * the unique identifier for the node
@@ -110,6 +116,21 @@ public class NodeIdentifier {
         this.siteToSiteSecure = siteToSiteSecure;
     }
 
+    /**
+     * This constructor should not be used and exists solely for the use of JAXB
+     */
+    public NodeIdentifier() {
+        this.id = null;
+        this.apiAddress = null;
+        this.apiPort = 0;
+        this.socketAddress = null;
+        this.socketPort = 0;
+        this.nodeDn = null;
+        this.siteToSiteAddress = null;
+        this.siteToSitePort = null;
+        this.siteToSiteSecure = false;
+    }
+
     public String getId() {
         return id;
     }
@@ -213,9 +234,7 @@ public class NodeIdentifier {
 
     @Override
     public String toString() {
-        return "[" + "id=" + id + ", apiAddress=" + apiAddress + ", apiPort=" + apiPort
-            + ", socketAddress=" + socketAddress + ", socketPort=" + socketPort
-            + ", siteToSiteAddress=" + siteToSiteAddress + ", siteToSitePort=" + siteToSitePort + ']';
+        return apiAddress + ":" + apiPort;
     }
 
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/fb7b3fe4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/NodeProtocolSender.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/NodeProtocolSender.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/NodeProtocolSender.java
index be0c339..432a03d 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/NodeProtocolSender.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/NodeProtocolSender.java
@@ -18,9 +18,7 @@ package org.apache.nifi.cluster.protocol;
 
 import org.apache.nifi.cluster.protocol.message.ConnectionRequestMessage;
 import org.apache.nifi.cluster.protocol.message.ConnectionResponseMessage;
-import org.apache.nifi.cluster.protocol.message.ControllerStartupFailureMessage;
 import org.apache.nifi.cluster.protocol.message.HeartbeatMessage;
-import org.apache.nifi.cluster.protocol.message.ReconnectionFailureMessage;
 
 /**
  * An interface for sending protocol messages from a node to the cluster
@@ -41,34 +39,11 @@ public interface NodeProtocolSender {
     ConnectionResponseMessage requestConnection(ConnectionRequestMessage msg) throws ProtocolException, UnknownServiceAddressException;
 
     /**
-     * Sends a "heartbeat" message to the cluster manager.
+     * Sends a heartbeat to the address given
      *
-     * @param msg a message
-     * @throws UnknownServiceAddressException if the cluster manager's address
-     * is not known
-     * @throws ProtocolException if communication failed
+     * @param msg the heartbeat message to send
+     * @param address the address of the Cluster Coordinator in &lt;hostname&gt;:&lt;port&gt; format
+     * @throws ProtocolException if unable to send the heartbeat
      */
-    void heartbeat(HeartbeatMessage msg) throws ProtocolException, UnknownServiceAddressException;
-
-    /**
-     * Sends a failure notification if the controller was unable start.
-     *
-     * @param msg a message
-     * @throws UnknownServiceAddressException if the cluster manager's address
-     * is not known
-     * @throws ProtocolException if communication failed
-     */
-    void notifyControllerStartupFailure(ControllerStartupFailureMessage msg) throws ProtocolException, UnknownServiceAddressException;
-
-    /**
-     * Sends a failure notification if the node was unable to reconnect to the
-     * cluster
-     *
-     * @param msg a message
-     * @throws UnknownServiceAddressException if the cluster manager's address
-     * is not known
-     * @throws ProtocolException if communication failed
-     */
-    void notifyReconnectionFailure(ReconnectionFailureMessage msg) throws ProtocolException, UnknownServiceAddressException;
-
+    void heartbeat(HeartbeatMessage msg, String address) throws ProtocolException;
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/fb7b3fe4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterManagerProtocolSenderImpl.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterManagerProtocolSenderImpl.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterManagerProtocolSenderImpl.java
index fb9292e..63ba5ba 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterManagerProtocolSenderImpl.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterManagerProtocolSenderImpl.java
@@ -16,10 +16,14 @@
  */
 package org.apache.nifi.cluster.protocol.impl;
 
+import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.Socket;
 import java.net.SocketException;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.nifi.cluster.protocol.ClusterManagerProtocolSender;
@@ -31,6 +35,7 @@ import org.apache.nifi.cluster.protocol.ProtocolMessageUnmarshaller;
 import org.apache.nifi.cluster.protocol.message.DisconnectMessage;
 import org.apache.nifi.cluster.protocol.message.FlowRequestMessage;
 import org.apache.nifi.cluster.protocol.message.FlowResponseMessage;
+import org.apache.nifi.cluster.protocol.message.NodeStatusChangeMessage;
 import org.apache.nifi.cluster.protocol.message.ProtocolMessage;
 import org.apache.nifi.cluster.protocol.message.ProtocolMessage.MessageType;
 import org.apache.nifi.cluster.protocol.message.ReconnectionRequestMessage;
@@ -39,6 +44,7 @@ import org.apache.nifi.io.socket.SocketConfiguration;
 import org.apache.nifi.io.socket.SocketUtils;
 import org.apache.nifi.reporting.BulletinRepository;
 import org.apache.nifi.util.FormatUtils;
+import org.apache.nifi.util.NiFiProperties;
 
 /**
  * A protocol sender for sending protocol messages from the cluster manager to
@@ -216,4 +222,42 @@ public class ClusterManagerProtocolSenderImpl implements ClusterManagerProtocolS
             throw new ProtocolException("Failed to create socket due to: " + ioe, ioe);
         }
     }
+
+    @Override
+    public void notifyNodeStatusChange(final Set<NodeIdentifier> nodesToNotify, final NodeStatusChangeMessage msg) {
+        final NiFiProperties properties = NiFiProperties.getInstance();
+        final int numThreads = Math.min(nodesToNotify.size(), properties.getClusterManagerProtocolThreads());
+
+        final byte[] msgBytes;
+        try (final ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
+            final ProtocolMessageMarshaller<ProtocolMessage> marshaller = protocolContext.createMarshaller();
+            marshaller.marshal(msg, baos);
+            msgBytes = baos.toByteArray();
+        } catch (final IOException e) {
+            throw new ProtocolException("Failed to marshal NodeStatusChangeMessage", e);
+        }
+
+        final ExecutorService executor = Executors.newFixedThreadPool(numThreads);
+        for (final NodeIdentifier nodeId : nodesToNotify) {
+            executor.submit(new Runnable() {
+                @Override
+                public void run() {
+                    try (final Socket socket = createSocket(nodeId, true)) {
+                        // marshal message to output stream
+                        socket.getOutputStream().write(msgBytes);
+                    } catch (final IOException ioe) {
+                        throw new ProtocolException("Failed to send Node Status Change message to " + nodeId, ioe);
+                    }
+                }
+            });
+        }
+
+        executor.shutdown();
+
+        try {
+            executor.awaitTermination(10, TimeUnit.DAYS);
+        } catch (final InterruptedException ie) {
+            throw new ProtocolException(ie);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/fb7b3fe4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterManagerProtocolSenderListener.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterManagerProtocolSenderListener.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterManagerProtocolSenderListener.java
index 54d33a8..2fc05b0 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterManagerProtocolSenderListener.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterManagerProtocolSenderListener.java
@@ -18,14 +18,17 @@ package org.apache.nifi.cluster.protocol.impl;
 
 import java.io.IOException;
 import java.util.Collection;
+import java.util.Set;
 
 import org.apache.nifi.cluster.protocol.ClusterManagerProtocolSender;
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
 import org.apache.nifi.cluster.protocol.ProtocolException;
 import org.apache.nifi.cluster.protocol.ProtocolHandler;
 import org.apache.nifi.cluster.protocol.ProtocolListener;
 import org.apache.nifi.cluster.protocol.message.DisconnectMessage;
 import org.apache.nifi.cluster.protocol.message.FlowRequestMessage;
 import org.apache.nifi.cluster.protocol.message.FlowResponseMessage;
+import org.apache.nifi.cluster.protocol.message.NodeStatusChangeMessage;
 import org.apache.nifi.cluster.protocol.message.ReconnectionRequestMessage;
 import org.apache.nifi.cluster.protocol.message.ReconnectionResponseMessage;
 import org.apache.nifi.reporting.BulletinRepository;
@@ -107,4 +110,9 @@ public class ClusterManagerProtocolSenderListener implements ClusterManagerProto
     public void disconnect(DisconnectMessage msg) throws ProtocolException {
         sender.disconnect(msg);
     }
+
+    @Override
+    public void notifyNodeStatusChange(Set<NodeIdentifier> nodesToNotify, NodeStatusChangeMessage msg) {
+        sender.notifyNodeStatusChange(nodesToNotify, msg);
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/fb7b3fe4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/NodeProtocolSenderImpl.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/NodeProtocolSenderImpl.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/NodeProtocolSenderImpl.java
index 886553e..10a58cf 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/NodeProtocolSenderImpl.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/NodeProtocolSenderImpl.java
@@ -17,6 +17,7 @@
 package org.apache.nifi.cluster.protocol.impl;
 
 import java.io.IOException;
+import java.net.InetSocketAddress;
 import java.net.Socket;
 import java.security.cert.CertificateException;
 import org.apache.nifi.cluster.protocol.NodeProtocolSender;
@@ -27,11 +28,9 @@ import org.apache.nifi.cluster.protocol.ProtocolMessageUnmarshaller;
 import org.apache.nifi.cluster.protocol.UnknownServiceAddressException;
 import org.apache.nifi.cluster.protocol.message.ConnectionRequestMessage;
 import org.apache.nifi.cluster.protocol.message.ConnectionResponseMessage;
-import org.apache.nifi.cluster.protocol.message.ControllerStartupFailureMessage;
 import org.apache.nifi.cluster.protocol.message.HeartbeatMessage;
 import org.apache.nifi.cluster.protocol.message.ProtocolMessage;
 import org.apache.nifi.cluster.protocol.message.ProtocolMessage.MessageType;
-import org.apache.nifi.cluster.protocol.message.ReconnectionFailureMessage;
 import org.apache.nifi.io.socket.SocketConfiguration;
 import org.apache.nifi.io.socket.SocketUtils;
 import org.apache.nifi.io.socket.multicast.DiscoverableService;
@@ -95,6 +94,20 @@ public class NodeProtocolSenderImpl implements NodeProtocolSender {
         }
     }
 
+    public void heartbeat(final HeartbeatMessage msg, final String address) throws ProtocolException {
+        final String hostname;
+        final int port;
+        try {
+            final String[] parts = address.split(":");
+            hostname = parts[0];
+            port = Integer.parseInt(parts[1]);
+        } catch (final Exception e) {
+            throw new IllegalArgumentException("Cannot send heartbeat to address [" + address + "]. Address must be in <hostname>:<port> format");
+        }
+
+        sendProtocolMessage(msg, hostname, port);
+    }
+
     private String getNCMDN(Socket socket) {
         try {
             return CertificateUtils.extractClientDNFromSSLSocket(socket);
@@ -103,21 +116,6 @@ public class NodeProtocolSenderImpl implements NodeProtocolSender {
         }
     }
 
-    @Override
-    public void heartbeat(final HeartbeatMessage msg) throws ProtocolException, UnknownServiceAddressException {
-        sendProtocolMessage(msg);
-    }
-
-    @Override
-    public void notifyControllerStartupFailure(final ControllerStartupFailureMessage msg) throws ProtocolException, UnknownServiceAddressException {
-        sendProtocolMessage(msg);
-    }
-
-    @Override
-    public void notifyReconnectionFailure(ReconnectionFailureMessage msg) throws ProtocolException, UnknownServiceAddressException {
-        sendProtocolMessage(msg);
-    }
-
     private Socket createSocket() {
         // determine the cluster manager's address
         final DiscoverableService service = clusterManagerProtocolServiceLocator.getService();
@@ -133,10 +131,18 @@ public class NodeProtocolSenderImpl implements NodeProtocolSender {
         }
     }
 
-    private void sendProtocolMessage(final ProtocolMessage msg) {
+    public SocketConfiguration getSocketConfiguration() {
+        return socketConfiguration;
+    }
+
+    private void sendProtocolMessage(final ProtocolMessage msg, final String hostname, final int port) {
         Socket socket = null;
         try {
-            socket = createSocket();
+            try {
+                socket = SocketUtils.createSocket(new InetSocketAddress(hostname, port), socketConfiguration);
+            } catch (IOException e) {
+                throw new ProtocolException("Failed to send message to Cluster Coordinator due to: " + e, e);
+            }
 
             try {
                 // marshal message to output stream
@@ -149,9 +155,4 @@ public class NodeProtocolSenderImpl implements NodeProtocolSender {
             SocketUtils.closeQuietly(socket);
         }
     }
-
-    public SocketConfiguration getSocketConfiguration() {
-        return socketConfiguration;
-    }
-
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/fb7b3fe4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/NodeProtocolSenderListener.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/NodeProtocolSenderListener.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/NodeProtocolSenderListener.java
index 0a9a064..3d0eb8e 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/NodeProtocolSenderListener.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/NodeProtocolSenderListener.java
@@ -26,9 +26,7 @@ import org.apache.nifi.cluster.protocol.ProtocolListener;
 import org.apache.nifi.cluster.protocol.UnknownServiceAddressException;
 import org.apache.nifi.cluster.protocol.message.ConnectionRequestMessage;
 import org.apache.nifi.cluster.protocol.message.ConnectionResponseMessage;
-import org.apache.nifi.cluster.protocol.message.ControllerStartupFailureMessage;
 import org.apache.nifi.cluster.protocol.message.HeartbeatMessage;
-import org.apache.nifi.cluster.protocol.message.ReconnectionFailureMessage;
 import org.apache.nifi.reporting.BulletinRepository;
 
 public class NodeProtocolSenderListener implements NodeProtocolSender, ProtocolListener {
@@ -83,27 +81,17 @@ public class NodeProtocolSenderListener implements NodeProtocolSender, ProtocolL
     }
 
     @Override
-    public void heartbeat(final HeartbeatMessage msg) throws ProtocolException, UnknownServiceAddressException {
-        sender.heartbeat(msg);
-    }
-
-    @Override
     public ConnectionResponseMessage requestConnection(final ConnectionRequestMessage msg) throws ProtocolException, UnknownServiceAddressException {
         return sender.requestConnection(msg);
     }
 
     @Override
-    public void notifyControllerStartupFailure(final ControllerStartupFailureMessage msg) throws ProtocolException, UnknownServiceAddressException {
-        sender.notifyControllerStartupFailure(msg);
-    }
-
-    @Override
-    public void notifyReconnectionFailure(final ReconnectionFailureMessage msg) throws ProtocolException, UnknownServiceAddressException {
-        sender.notifyReconnectionFailure(msg);
+    public void setBulletinRepository(final BulletinRepository bulletinRepository) {
+        listener.setBulletinRepository(bulletinRepository);
     }
 
     @Override
-    public void setBulletinRepository(final BulletinRepository bulletinRepository) {
-        listener.setBulletinRepository(bulletinRepository);
+    public void heartbeat(HeartbeatMessage msg, String address) throws ProtocolException {
+        sender.heartbeat(msg, address);
     }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/fb7b3fe4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedConnectionResponse.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedConnectionResponse.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedConnectionResponse.java
index 4243b41..d42515e 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedConnectionResponse.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedConnectionResponse.java
@@ -28,7 +28,6 @@ public class AdaptedConnectionResponse {
     private StandardDataFlow dataFlow;
     private NodeIdentifier nodeIdentifier;
     private String rejectionReason;
-    private boolean primary;
     private int tryLaterSeconds;
     private Integer managerRemoteInputPort;
     private Boolean managerRemoteCommsSecure;
@@ -71,14 +70,6 @@ public class AdaptedConnectionResponse {
         this.rejectionReason = rejectionReason;
     }
 
-    public boolean isPrimary() {
-        return primary;
-    }
-
-    public void setPrimary(boolean primary) {
-        this.primary = primary;
-    }
-
     public boolean shouldTryLater() {
         return tryLaterSeconds > 0;
     }

http://git-wip-us.apache.org/repos/asf/nifi/blob/fb7b3fe4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedHeartbeat.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedHeartbeat.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedHeartbeat.java
index cec3757..9501b48 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedHeartbeat.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedHeartbeat.java
@@ -17,6 +17,8 @@
 package org.apache.nifi.cluster.protocol.jaxb.message;
 
 import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter;
+
+import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus;
 import org.apache.nifi.cluster.protocol.NodeIdentifier;
 
 /**
@@ -26,7 +28,7 @@ public class AdaptedHeartbeat {
     private NodeIdentifier nodeIdentifier;
     private byte[] payload;
     private boolean primary;
-    private boolean connected;
+    private NodeConnectionStatus connectionStatus;
 
     public AdaptedHeartbeat() {
     }
@@ -48,12 +50,12 @@ public class AdaptedHeartbeat {
         this.primary = primary;
     }
 
-    public boolean isConnected() {
-        return connected;
+    public void setConnectionStatus(NodeConnectionStatus connectionStatus) {
+        this.connectionStatus = connectionStatus;
     }
 
-    public void setConnected(boolean connected) {
-        this.connected = connected;
+    public NodeConnectionStatus getConnectionStatus() {
+        return connectionStatus;
     }
 
     public byte[] getPayload() {

http://git-wip-us.apache.org/repos/asf/nifi/blob/fb7b3fe4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedNodeConnectionStatus.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedNodeConnectionStatus.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedNodeConnectionStatus.java
new file mode 100644
index 0000000..f9ec3b1
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedNodeConnectionStatus.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.cluster.protocol.jaxb.message;
+
+import org.apache.nifi.cluster.coordination.node.DisconnectionCode;
+import org.apache.nifi.cluster.coordination.node.NodeConnectionState;
+
+public class AdaptedNodeConnectionStatus {
+    private NodeConnectionState state;
+    private DisconnectionCode disconnectCode;
+    private String disconnectReason;
+    private Long connectionRequestTime;
+
+    public NodeConnectionState getState() {
+        return state;
+    }
+
+    public void setState(NodeConnectionState state) {
+        this.state = state;
+    }
+
+    public DisconnectionCode getDisconnectCode() {
+        return disconnectCode;
+    }
+
+    public void setDisconnectCode(DisconnectionCode disconnectCode) {
+        this.disconnectCode = disconnectCode;
+    }
+
+    public String getDisconnectReason() {
+        return disconnectReason;
+    }
+
+    public void setDisconnectReason(String disconnectReason) {
+        this.disconnectReason = disconnectReason;
+    }
+
+    public Long getConnectionRequestTime() {
+        return connectionRequestTime;
+    }
+
+    public void setConnectionRequestTime(Long connectionRequestTime) {
+        this.connectionRequestTime = connectionRequestTime;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/fb7b3fe4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/ConnectionResponseAdapter.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/ConnectionResponseAdapter.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/ConnectionResponseAdapter.java
index b2c1c67..a1bc907 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/ConnectionResponseAdapter.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/ConnectionResponseAdapter.java
@@ -31,7 +31,6 @@ public class ConnectionResponseAdapter extends XmlAdapter<AdaptedConnectionRespo
             aCr.setNodeIdentifier(cr.getNodeIdentifier());
             aCr.setTryLaterSeconds(cr.getTryLaterSeconds());
             aCr.setRejectionReason(cr.getRejectionReason());
-            aCr.setPrimary(cr.isPrimary());
             aCr.setManagerRemoteInputPort(cr.getManagerRemoteInputPort());
             aCr.setManagerRemoteCommsSecure(cr.isManagerRemoteCommsSecure());
             aCr.setInstanceId(cr.getInstanceId());
@@ -46,7 +45,7 @@ public class ConnectionResponseAdapter extends XmlAdapter<AdaptedConnectionRespo
         } else if (aCr.getRejectionReason() != null) {
             return ConnectionResponse.createRejectionResponse(aCr.getRejectionReason());
         } else {
-            return new ConnectionResponse(aCr.getNodeIdentifier(), aCr.getDataFlow(), aCr.isPrimary(),
+            return new ConnectionResponse(aCr.getNodeIdentifier(), aCr.getDataFlow(),
                 aCr.getManagerRemoteInputPort(), aCr.isManagerRemoteCommsSecure(), aCr.getInstanceId());
         }
     }

http://git-wip-us.apache.org/repos/asf/nifi/blob/fb7b3fe4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/HeartbeatAdapter.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/HeartbeatAdapter.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/HeartbeatAdapter.java
index 412332f..2666abd 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/HeartbeatAdapter.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/HeartbeatAdapter.java
@@ -25,7 +25,6 @@ public class HeartbeatAdapter extends XmlAdapter<AdaptedHeartbeat, Heartbeat> {
 
     @Override
     public AdaptedHeartbeat marshal(final Heartbeat hb) {
-
         final AdaptedHeartbeat aHb = new AdaptedHeartbeat();
 
         if (hb != null) {
@@ -39,7 +38,7 @@ public class HeartbeatAdapter extends XmlAdapter<AdaptedHeartbeat, Heartbeat> {
             aHb.setPrimary(hb.isPrimary());
 
             // set connected flag
-            aHb.setConnected(hb.isConnected());
+            aHb.setConnectionStatus(hb.getConnectionStatus());
         }
 
         return aHb;
@@ -47,7 +46,7 @@ public class HeartbeatAdapter extends XmlAdapter<AdaptedHeartbeat, Heartbeat> {
 
     @Override
     public Heartbeat unmarshal(final AdaptedHeartbeat aHb) {
-        return new Heartbeat(aHb.getNodeIdentifier(), aHb.isPrimary(), aHb.isConnected(), aHb.getPayload());
+        return new Heartbeat(aHb.getNodeIdentifier(), aHb.isPrimary(), aHb.getConnectionStatus(), aHb.getPayload());
     }
 
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/fb7b3fe4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/NodeConnectionStatusAdapter.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/NodeConnectionStatusAdapter.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/NodeConnectionStatusAdapter.java
new file mode 100644
index 0000000..e2c302d
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/NodeConnectionStatusAdapter.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.cluster.protocol.jaxb.message;
+
+import javax.xml.bind.annotation.adapters.XmlAdapter;
+
+import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus;
+
+public class NodeConnectionStatusAdapter extends XmlAdapter<AdaptedNodeConnectionStatus, NodeConnectionStatus> {
+
+    @Override
+    public NodeConnectionStatus unmarshal(final AdaptedNodeConnectionStatus adapted) throws Exception {
+        return new NodeConnectionStatus(adapted.getState(), adapted.getDisconnectCode(), adapted.getDisconnectReason(), adapted.getConnectionRequestTime());
+    }
+
+    @Override
+    public AdaptedNodeConnectionStatus marshal(final NodeConnectionStatus toAdapt) throws Exception {
+        final AdaptedNodeConnectionStatus adapted = new AdaptedNodeConnectionStatus();
+        adapted.setConnectionRequestTime(toAdapt.getConnectionRequestTime());
+        adapted.setDisconnectCode(toAdapt.getDisconnectCode());
+        adapted.setDisconnectReason(toAdapt.getDisconnectReason());
+        adapted.setState(toAdapt.getState());
+        return adapted;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/fb7b3fe4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/ObjectFactory.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/ObjectFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/ObjectFactory.java
index 25041ce..82df546 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/ObjectFactory.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/ObjectFactory.java
@@ -20,14 +20,13 @@ import javax.xml.bind.annotation.XmlRegistry;
 
 import org.apache.nifi.cluster.protocol.message.ConnectionRequestMessage;
 import org.apache.nifi.cluster.protocol.message.ConnectionResponseMessage;
-import org.apache.nifi.cluster.protocol.message.ControllerStartupFailureMessage;
 import org.apache.nifi.cluster.protocol.message.DisconnectMessage;
 import org.apache.nifi.cluster.protocol.message.FlowRequestMessage;
 import org.apache.nifi.cluster.protocol.message.FlowResponseMessage;
 import org.apache.nifi.cluster.protocol.message.HeartbeatMessage;
 import org.apache.nifi.cluster.protocol.message.MulticastProtocolMessage;
+import org.apache.nifi.cluster.protocol.message.NodeStatusChangeMessage;
 import org.apache.nifi.cluster.protocol.message.PingMessage;
-import org.apache.nifi.cluster.protocol.message.ReconnectionFailureMessage;
 import org.apache.nifi.cluster.protocol.message.ReconnectionRequestMessage;
 import org.apache.nifi.cluster.protocol.message.ReconnectionResponseMessage;
 import org.apache.nifi.cluster.protocol.message.ServiceBroadcastMessage;
@@ -44,10 +43,6 @@ public class ObjectFactory {
         return new ReconnectionRequestMessage();
     }
 
-    public ReconnectionFailureMessage createReconnectionFailureMessage() {
-        return new ReconnectionFailureMessage();
-    }
-
     public ReconnectionResponseMessage createReconnectionResponseMessage() {
         return new ReconnectionResponseMessage();
     }
@@ -88,7 +83,7 @@ public class ObjectFactory {
         return new MulticastProtocolMessage();
     }
 
-    public ControllerStartupFailureMessage createControllerStartupFailureMessage() {
-        return new ControllerStartupFailureMessage();
+    public NodeStatusChangeMessage createNodeStatusChangeMessage() {
+        return new NodeStatusChangeMessage();
     }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/fb7b3fe4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ControllerStartupFailureMessage.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ControllerStartupFailureMessage.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ControllerStartupFailureMessage.java
deleted file mode 100644
index 3d3bd43..0000000
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ControllerStartupFailureMessage.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.cluster.protocol.message;
-
-import javax.xml.bind.annotation.XmlRootElement;
-import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter;
-
-import org.apache.nifi.cluster.protocol.NodeIdentifier;
-import org.apache.nifi.cluster.protocol.jaxb.message.NodeIdentifierAdapter;
-
-/**
- */
-@XmlRootElement(name = "controllerStartupFailureMessage")
-public class ControllerStartupFailureMessage extends ExceptionMessage {
-
-    private NodeIdentifier nodeId;
-
-    public ControllerStartupFailureMessage() {
-    }
-
-    @Override
-    public MessageType getType() {
-        return MessageType.CONTROLLER_STARTUP_FAILURE;
-    }
-
-    @XmlJavaTypeAdapter(NodeIdentifierAdapter.class)
-    public NodeIdentifier getNodeId() {
-        return nodeId;
-    }
-
-    public void setNodeId(NodeIdentifier nodeId) {
-        this.nodeId = nodeId;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/fb7b3fe4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/HeartbeatMessage.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/HeartbeatMessage.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/HeartbeatMessage.java
index 15432b1..b5b2ecb 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/HeartbeatMessage.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/HeartbeatMessage.java
@@ -16,9 +16,10 @@
  */
 package org.apache.nifi.cluster.protocol.message;
 
-import org.apache.nifi.cluster.protocol.Heartbeat;
 import javax.xml.bind.annotation.XmlRootElement;
 
+import org.apache.nifi.cluster.protocol.Heartbeat;
+
 /**
  */
 @XmlRootElement(name = "heartbeatMessage")
@@ -26,11 +27,6 @@ public class HeartbeatMessage extends ProtocolMessage {
 
     private Heartbeat heartbeat;
 
-    @Override
-    public MessageType getType() {
-        return MessageType.HEARTBEAT;
-    }
-
     public Heartbeat getHeartbeat() {
         return heartbeat;
     }
@@ -39,4 +35,8 @@ public class HeartbeatMessage extends ProtocolMessage {
         this.heartbeat = heartbeat;
     }
 
+    @Override
+    public MessageType getType() {
+        return MessageType.HEARTBEAT;
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/fb7b3fe4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/NodeStatusChangeMessage.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/NodeStatusChangeMessage.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/NodeStatusChangeMessage.java
new file mode 100644
index 0000000..7a99d0e
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/NodeStatusChangeMessage.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.cluster.protocol.message;
+
+import javax.xml.bind.annotation.XmlRootElement;
+
+import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus;
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
+
+/**
+ * Message to indicate that the status of a node in the cluster has changed
+ */
+@XmlRootElement(name = "nodeStatusChange")
+public class NodeStatusChangeMessage extends ProtocolMessage {
+    private NodeConnectionStatus connectionStatus;
+    private NodeIdentifier nodeId;
+    private Long statusUpdateId = -1L;
+
+    @Override
+    public MessageType getType() {
+        return MessageType.NODE_STATUS_CHANGE;
+    }
+
+    public void setNodeConnectionStatus(final NodeConnectionStatus status) {
+        this.connectionStatus = status;
+    }
+
+    public NodeConnectionStatus getNodeConnectionStatus() {
+        return connectionStatus;
+    }
+
+    public NodeIdentifier getNodeId() {
+        return nodeId;
+    }
+
+    public void setNodeId(NodeIdentifier nodeId) {
+        this.nodeId = nodeId;
+    }
+
+    public Long getStatusUpdateIdentifier() {
+        return statusUpdateId;
+    }
+
+    public void setStatusUpdateIdentifier(Long statusUpdateId) {
+        this.statusUpdateId = statusUpdateId;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/fb7b3fe4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ProtocolMessage.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ProtocolMessage.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ProtocolMessage.java
index 5953e09..27be95f 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ProtocolMessage.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ProtocolMessage.java
@@ -23,8 +23,6 @@ public abstract class ProtocolMessage {
     public static enum MessageType {
         CONNECTION_REQUEST,
         CONNECTION_RESPONSE,
-        CONTROLLER_STARTUP_FAILURE,
-        RECONNECTION_FAILURE,
         DISCONNECTION_REQUEST,
         EXCEPTION,
         FLOW_REQUEST,
@@ -34,6 +32,7 @@ public abstract class ProtocolMessage {
         RECONNECTION_REQUEST,
         RECONNECTION_RESPONSE,
         SERVICE_BROADCAST,
+        NODE_STATUS_CHANGE;
     }
 
     public abstract MessageType getType();

http://git-wip-us.apache.org/repos/asf/nifi/blob/fb7b3fe4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ReconnectionFailureMessage.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ReconnectionFailureMessage.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ReconnectionFailureMessage.java
deleted file mode 100644
index ce62c5b..0000000
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ReconnectionFailureMessage.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.cluster.protocol.message;
-
-import javax.xml.bind.annotation.XmlRootElement;
-import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter;
-
-import org.apache.nifi.cluster.protocol.NodeIdentifier;
-import org.apache.nifi.cluster.protocol.jaxb.message.NodeIdentifierAdapter;
-
-@XmlRootElement(name = "reconnectionFailureMessage")
-public class ReconnectionFailureMessage extends ExceptionMessage {
-
-    private NodeIdentifier nodeId;
-
-    public ReconnectionFailureMessage() {
-    }
-
-    @Override
-    public MessageType getType() {
-        return MessageType.RECONNECTION_FAILURE;
-    }
-
-    @XmlJavaTypeAdapter(NodeIdentifierAdapter.class)
-    public NodeIdentifier getNodeId() {
-        return nodeId;
-    }
-
-    public void setNodeId(NodeIdentifier nodeId) {
-        this.nodeId = nodeId;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/fb7b3fe4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/impl/NodeProtocolSenderImplTest.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/impl/NodeProtocolSenderImplTest.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/impl/NodeProtocolSenderImplTest.java
index 94c0a20..f9a986f 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/impl/NodeProtocolSenderImplTest.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/impl/NodeProtocolSenderImplTest.java
@@ -28,7 +28,6 @@ import java.util.UUID;
 
 import org.apache.nifi.cluster.protocol.ConnectionRequest;
 import org.apache.nifi.cluster.protocol.ConnectionResponse;
-import org.apache.nifi.cluster.protocol.Heartbeat;
 import org.apache.nifi.cluster.protocol.NodeIdentifier;
 import org.apache.nifi.cluster.protocol.ProtocolContext;
 import org.apache.nifi.cluster.protocol.ProtocolException;
@@ -39,8 +38,6 @@ import org.apache.nifi.cluster.protocol.jaxb.JaxbProtocolContext;
 import org.apache.nifi.cluster.protocol.jaxb.message.JaxbProtocolUtils;
 import org.apache.nifi.cluster.protocol.message.ConnectionRequestMessage;
 import org.apache.nifi.cluster.protocol.message.ConnectionResponseMessage;
-import org.apache.nifi.cluster.protocol.message.ControllerStartupFailureMessage;
-import org.apache.nifi.cluster.protocol.message.HeartbeatMessage;
 import org.apache.nifi.cluster.protocol.message.PingMessage;
 import org.apache.nifi.cluster.protocol.message.ProtocolMessage;
 import org.apache.nifi.io.socket.ServerSocketConfiguration;
@@ -109,7 +106,7 @@ public class NodeProtocolSenderImplTest {
         when(mockHandler.canHandle(any(ProtocolMessage.class))).thenReturn(Boolean.TRUE);
         ConnectionResponseMessage mockMessage = new ConnectionResponseMessage();
         mockMessage.setConnectionResponse(new ConnectionResponse(nodeIdentifier,
-            new StandardDataFlow("flow".getBytes("UTF-8"), new byte[0], new byte[0]), false, null, null, UUID.randomUUID().toString()));
+            new StandardDataFlow("flow".getBytes("UTF-8"), new byte[0], new byte[0]), null, null, UUID.randomUUID().toString()));
         when(mockHandler.handle(any(ProtocolMessage.class))).thenReturn(mockMessage);
 
         ConnectionRequestMessage request = new ConnectionRequestMessage();
@@ -168,30 +165,4 @@ public class NodeProtocolSenderImplTest {
         fail("failed to throw exception");
 
     }
-
-    @Test
-    public void testHeartbeat() throws Exception {
-
-        when(mockServiceLocator.getService()).thenReturn(service);
-        when(mockHandler.canHandle(any(ProtocolMessage.class))).thenReturn(Boolean.TRUE);
-        when(mockHandler.handle(any(ProtocolMessage.class))).thenReturn(null);
-
-        HeartbeatMessage hb = new HeartbeatMessage();
-        hb.setHeartbeat(new Heartbeat(new NodeIdentifier("id", "localhost", 3, "localhost", 4, "localhost", 3821, false), false, false, new byte[] {1, 2, 3}));
-        sender.heartbeat(hb);
-    }
-
-    @Test
-    public void testNotifyControllerStartupFailure() throws Exception {
-
-        when(mockServiceLocator.getService()).thenReturn(service);
-        when(mockHandler.canHandle(any(ProtocolMessage.class))).thenReturn(Boolean.TRUE);
-        when(mockHandler.handle(any(ProtocolMessage.class))).thenReturn(null);
-
-        ControllerStartupFailureMessage msg = new ControllerStartupFailureMessage();
-        msg.setNodeId(new NodeIdentifier("some-id", "some-addr", 1, "some-addr", 1, "localhost", 3821, false));
-        msg.setExceptionMessage("some exception");
-        sender.notifyControllerStartupFailure(msg);
-    }
-
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/fb7b3fe4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/pom.xml b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/pom.xml
index 1e1eae7..e5a1a7d 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/pom.xml
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/pom.xml
@@ -134,6 +134,19 @@
             <groupId>org.springframework</groupId>
             <artifactId>spring-context</artifactId>
         </dependency>
+        
+        <!-- testing dependencies for ZooKeeper / Curator -->
+        <dependency>
+            <groupId>org.apache.curator</groupId>
+            <artifactId>curator-test</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.testng</groupId>
+            <artifactId>testng</artifactId>
+            <scope>test</scope>
+        </dependency>
+        
     </dependencies>
     <build>
         <plugins>

http://git-wip-us.apache.org/repos/asf/nifi/blob/fb7b3fe4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/ClusterCoordinator.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/ClusterCoordinator.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/ClusterCoordinator.java
new file mode 100644
index 0000000..59ded24
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/ClusterCoordinator.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.cluster.coordination;
+
+import java.util.Set;
+
+import org.apache.nifi.cluster.coordination.node.DisconnectionCode;
+import org.apache.nifi.cluster.coordination.node.NodeConnectionState;
+import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus;
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
+import org.apache.nifi.reporting.Severity;
+
+/**
+ * <p>
+ * Responsible for coordinating nodes in the cluster
+ * <p>
+ */
+public interface ClusterCoordinator {
+
+    /**
+     * Sends a request to the node to connect to the cluster. This will immediately
+     * set the NodeConnectionStatus to DISCONNECTED.
+     *
+     * @param nodeId the identifier of the node
+     */
+    void requestNodeConnect(NodeIdentifier nodeId);
+
+    /**
+     * Indicates that the node has sent a valid heartbeat and should now
+     * be considered part of the cluster
+     *
+     * @param nodeId the identifier of the node
+     */
+    void finishNodeConnection(NodeIdentifier nodeId);
+
+    /**
+     * Sends a request to the node to disconnect from the cluster.
+     * The node will be marked as disconnected immediately.
+     *
+     * @param nodeId the identifier of the node
+     * @param disconnectionCode the code that represents why this node is being asked to disconnect
+     * @param explanation an explanation as to why the node is being asked to disconnect
+     *            from the cluster
+     */
+    void requestNodeDisconnect(NodeIdentifier nodeId, DisconnectionCode disconnectionCode, String explanation);
+
+    /**
+     * Notifies the Cluster Coordinator that the node with the given ID has requested to disconnect
+     * from the cluster. If no node exists in the cluster with the given ID, this method has no effect.
+     *
+     * @param nodeId the identifier of the node
+     * @param disconnectionCode the code that represents why this node is requesting to disconnect
+     * @param explanation an explanation as to why the node is requesting to disconnect from the cluster
+     */
+    void disconnectionRequestedByNode(NodeIdentifier nodeId, DisconnectionCode disconnectionCode, String explanation);
+
+    /**
+     * Returns the current status of the node with the given identifier
+     *
+     * @param nodeId the identifier of the node
+     *
+     * @return the current status of the node with the given identifier,
+     *         or <code>null</code> if no node is known with the given identifier
+     */
+    NodeConnectionStatus getConnectionStatus(NodeIdentifier nodeId);
+
+    /**
+     * Returns the identifiers of all nodes that have the given connection state
+     *
+     * @param state the state
+     * @return the identifiers of all nodes that have the given connection state
+     */
+    Set<NodeIdentifier> getNodeIdentifiers(NodeConnectionState state);
+
+    /**
+     * Checks if the given hostname is blocked by the configured firewall, returning
+     * <code>true</code> if the node is blocked, <code>false</code> if the node is
+     * allowed through the firewall or if there is no firewall configured
+     *
+     * @param hostname the hostname of the node that is attempting to connect to the cluster
+     *
+     * @return <code>true</code> if the node is blocked, <code>false</code> if the node is
+     *         allowed through the firewall or if there is no firewall configured
+     */
+    boolean isBlockedByFirewall(String hostname);
+
+    /**
+     * Reports that some event occurred that is relevant to the cluster
+     *
+     * @param nodeId the identifier of the node that the event pertains to, or <code>null</code> if not applicable
+     * @param severity the severity of the event
+     * @param event an explanation of the event
+     */
+    void reportEvent(NodeIdentifier nodeId, Severity severity, String event);
+
+    /**
+     * Updates the node that is considered the Primary Node
+     *
+     * @param nodeId the id of the Primary Node
+     */
+    void setPrimaryNode(NodeIdentifier nodeId);
+
+    /**
+     * Returns the NodeIdentifier that exists that has the given UUID, or <code>null</code> if no NodeIdentifier
+     * exists for the given UUID
+     *
+     * @param uuid the UUID of the NodeIdentifier to obtain
+     * @return the NodeIdentifier that exists that has the given UUID, or <code>null</code> if no NodeIdentifier
+     *         exists for the given UUID
+     */
+    NodeIdentifier getNodeIdentifier(String uuid);
+}


[3/4] nifi git commit: NIFI-1678: Started refactoring heartbeating mechanism, using a new package: org.apache.nifi.cluster.coordination

Posted by mc...@apache.org.
http://git-wip-us.apache.org/repos/asf/nifi/blob/fb7b3fe4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/AbstractHeartbeatMonitor.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/AbstractHeartbeatMonitor.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/AbstractHeartbeatMonitor.java
new file mode 100644
index 0000000..d6838cd
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/AbstractHeartbeatMonitor.java
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.cluster.coordination.heartbeat;
+
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.nifi.cluster.coordination.ClusterCoordinator;
+import org.apache.nifi.cluster.coordination.node.DisconnectionCode;
+import org.apache.nifi.cluster.coordination.node.NodeConnectionState;
+import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus;
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
+import org.apache.nifi.engine.FlowEngine;
+import org.apache.nifi.reporting.Severity;
+import org.apache.nifi.util.FormatUtils;
+import org.apache.nifi.util.NiFiProperties;
+import org.apache.nifi.util.StopWatch;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class AbstractHeartbeatMonitor implements HeartbeatMonitor {
+
+    private final int heartbeatIntervalMillis;
+    private static final Logger logger = LoggerFactory.getLogger(AbstractHeartbeatMonitor.class);
+    protected final ClusterCoordinator clusterCoordinator;
+    protected final FlowEngine flowEngine = new FlowEngine(1, "Heartbeat Monitor", true);
+
+    protected volatile long latestHeartbeatTime;
+    private volatile ScheduledFuture<?> future;
+    private volatile boolean stopped = true;
+
+
+    public AbstractHeartbeatMonitor(final ClusterCoordinator clusterCoordinator, final Properties properties) {
+        this.clusterCoordinator = clusterCoordinator;
+        final String heartbeatInterval = properties.getProperty(NiFiProperties.CLUSTER_PROTOCOL_HEARTBEAT_INTERVAL,
+            NiFiProperties.DEFAULT_CLUSTER_PROTOCOL_HEARTBEAT_INTERVAL);
+        this.heartbeatIntervalMillis = (int) FormatUtils.getTimeDuration(heartbeatInterval, TimeUnit.MILLISECONDS);
+    }
+
+    @Override
+    public final void start() {
+        stopped = false;
+        onStart();
+
+        this.future = flowEngine.scheduleWithFixedDelay(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    monitorHeartbeats();
+                } catch (final Exception e) {
+                    clusterCoordinator.reportEvent(null, Severity.ERROR, "Failed to process heartbeats from nodes due to " + e.toString());
+                    logger.error("Failed to process heartbeats", e);
+                }
+            }
+        }, heartbeatIntervalMillis, heartbeatIntervalMillis, TimeUnit.MILLISECONDS);
+    }
+
+    @Override
+    public final void stop() {
+        this.stopped = true;
+
+        try {
+            if (future != null) {
+                future.cancel(true);
+            }
+        } finally {
+            onStop();
+        }
+    }
+
+    protected boolean isStopped() {
+        return stopped;
+    }
+
+    @Override
+    public NodeHeartbeat getLatestHeartbeat(final NodeIdentifier nodeId) {
+        return getLatestHeartbeats().get(nodeId);
+    }
+
+    protected ClusterCoordinator getClusterCoordinator() {
+        return clusterCoordinator;
+    }
+
+    protected long getHeartbeatInterval(final TimeUnit timeUnit) {
+        return timeUnit.convert(heartbeatIntervalMillis, TimeUnit.MILLISECONDS);
+    }
+
+    /**
+     * Fetches all of the latest heartbeats and updates the Cluster Coordinator as appropriate,
+     * based on the heartbeats received.
+     *
+     * Visible for testing.
+     */
+    protected synchronized void monitorHeartbeats() {
+        final Map<NodeIdentifier, NodeHeartbeat> latestHeartbeats = getLatestHeartbeats();
+        if (latestHeartbeats == null || latestHeartbeats.isEmpty()) {
+            // failed to fetch heartbeats; don't change anything.
+            clusterCoordinator.reportEvent(null, Severity.WARNING, "Failed to retrieve any new heartbeat information for nodes. "
+                + "Will not make any decisions based on heartbeats.");
+            return;
+        }
+
+        final StopWatch procStopWatch = new StopWatch(true);
+        for (final NodeHeartbeat heartbeat : latestHeartbeats.values()) {
+            try {
+                processHeartbeat(heartbeat);
+            } catch (final Exception e) {
+                clusterCoordinator.reportEvent(null, Severity.ERROR,
+                    "Received heartbeat from " + heartbeat.getNodeIdentifier() + " but failed to process heartbeat due to " + e);
+                logger.error("Failed to process heartbeat from {} due to {}", heartbeat.getNodeIdentifier(), e.toString());
+                logger.error("", e);
+            }
+        }
+
+        procStopWatch.stop();
+        logger.info("Finished processing {} heartbeats in {}", latestHeartbeats.size(), procStopWatch.getDuration());
+
+        // Disconnect any node that hasn't sent a heartbeat in a long time (8 times the heartbeat interval)
+        final long maxMillis = heartbeatIntervalMillis * 1000L * 8;
+        final long threshold = latestHeartbeatTime - maxMillis;
+        for (final NodeHeartbeat heartbeat : latestHeartbeats.values()) {
+            if (heartbeat.getTimestamp() < threshold) {
+                clusterCoordinator.requestNodeDisconnect(heartbeat.getNodeIdentifier(), DisconnectionCode.LACK_OF_HEARTBEAT,
+                    "Latest heartbeat from Node has expired");
+
+                try {
+                    removeHeartbeat(heartbeat.getNodeIdentifier());
+                } catch (final Exception e) {
+                    logger.warn("Failed to remove heartbeat for {} due to {}", heartbeat.getNodeIdentifier(), e.toString());
+                    logger.warn("", e);
+                }
+            }
+        }
+    }
+
+    private void processHeartbeat(final NodeHeartbeat heartbeat) {
+        final NodeIdentifier nodeId = heartbeat.getNodeIdentifier();
+
+        // Do not process heartbeat if it's blocked by firewall.
+        if (clusterCoordinator.isBlockedByFirewall(nodeId.getSocketAddress())) {
+            clusterCoordinator.reportEvent(nodeId, Severity.WARNING, "Firewall blocked received heartbeat. Issuing disconnection request.");
+
+            // request node to disconnect
+            clusterCoordinator.requestNodeDisconnect(nodeId, DisconnectionCode.BLOCKED_BY_FIREWALL, "Blocked by Firewall");
+            removeHeartbeat(nodeId);
+            return;
+        }
+
+        final NodeConnectionStatus connectionStatus = clusterCoordinator.getConnectionStatus(nodeId);
+        if (connectionStatus == null) {
+            final NodeConnectionState hbConnectionState = heartbeat.getConnectionStatus().getState();
+            if (hbConnectionState == NodeConnectionState.DISCONNECTED || hbConnectionState == NodeConnectionState.DISCONNECTING) {
+                // Node is not part of the cluster. Remove heartbeat and move on.
+                removeHeartbeat(nodeId);
+                return;
+            }
+
+            // Unknown node. Issue reconnect request
+            clusterCoordinator.reportEvent(nodeId, Severity.INFO, "Received heartbeat from unknown node. Removing heartbeat and requesting that node connect to cluster.");
+            removeHeartbeat(nodeId);
+
+            clusterCoordinator.requestNodeConnect(nodeId);
+            return;
+        }
+
+        final DisconnectionCode reportedDisconnectCode = heartbeat.getConnectionStatus().getDisconnectCode();
+        if (reportedDisconnectCode != null) {
+            // Check if the node is notifying us that it wants to disconnect from the cluster
+            final boolean requestingDisconnect;
+            switch (reportedDisconnectCode) {
+                case MISMATCHED_FLOWS:
+                case NODE_SHUTDOWN:
+                case STARTUP_FAILURE:
+                    final NodeConnectionState expectedState = connectionStatus.getState();
+                    requestingDisconnect = expectedState == NodeConnectionState.CONNECTED || expectedState == NodeConnectionState.CONNECTING;
+                    break;
+                default:
+                    requestingDisconnect = false;
+                    break;
+            }
+
+            if (requestingDisconnect) {
+                clusterCoordinator.disconnectionRequestedByNode(nodeId, heartbeat.getConnectionStatus().getDisconnectCode(),
+                    heartbeat.getConnectionStatus().getDisconnectReason());
+                removeHeartbeat(nodeId);
+                return;
+            }
+        }
+
+        final NodeConnectionState connectionState = connectionStatus.getState();
+        if (heartbeat.getConnectionStatus().getState() != NodeConnectionState.CONNECTED && connectionState == NodeConnectionState.CONNECTED) {
+            // Cluster Coordinator believes that node is connected, but node does not believe so.
+            clusterCoordinator.reportEvent(nodeId, Severity.WARNING, "Received heartbeat from node that thinks it is not yet part of the cluster,"
+                + "though the Cluster Coordinator thought it was (node claimed state was " + heartbeat.getConnectionStatus().getState()
+                + "). Marking as Disconnected and requesting that Node reconnect to cluster");
+            clusterCoordinator.requestNodeConnect(nodeId);
+            return;
+        }
+
+        if (NodeConnectionState.DISCONNECTED == connectionState) {
+            // ignore heartbeats from nodes disconnected by means other than lack of heartbeat, unless it is
+            // the only node. We allow it if it is the only node because if we have a one-node cluster, then
+            // we cannot manually reconnect it.
+            final DisconnectionCode disconnectionCode = connectionStatus.getDisconnectCode();
+
+            if (disconnectionCode == DisconnectionCode.LACK_OF_HEARTBEAT) {
+                // record event
+                clusterCoordinator.reportEvent(nodeId, Severity.INFO, "Received heartbeat from node previously "
+                    + "disconnected due to lack of heartbeat. Issuing reconnection request.");
+
+                clusterCoordinator.requestNodeConnect(nodeId);
+            } else {
+                // disconnected nodes should not heartbeat, so we need to issue a disconnection request
+                logger.info("Ignoring received heartbeat from disconnected node " + nodeId + ".  Issuing disconnection request.");
+                clusterCoordinator.requestNodeDisconnect(nodeId, connectionStatus.getDisconnectCode(), connectionStatus.getDisconnectReason());
+                removeHeartbeat(nodeId);
+            }
+
+            return;
+        }
+
+        if (NodeConnectionState.DISCONNECTING == connectionStatus.getState()) {
+            // ignore spurious heartbeat
+            removeHeartbeat(nodeId);
+            return;
+        }
+
+        // first heartbeat causes status change from connecting to connected
+        if (NodeConnectionState.CONNECTING == connectionState) {
+            final Long connectionRequestTime = connectionStatus.getConnectionRequestTime();
+            if (connectionRequestTime != null && heartbeat.getTimestamp() < connectionRequestTime) {
+                clusterCoordinator.reportEvent(nodeId, Severity.INFO, "Received heartbeat but ignoring because it was reported before the node was last asked to reconnect.");
+                removeHeartbeat(nodeId);
+                return;
+            }
+
+            // connection complete
+            clusterCoordinator.finishNodeConnection(nodeId);
+            clusterCoordinator.reportEvent(nodeId, Severity.INFO, "Received first heartbeat from connecting node. Node connected.");
+        }
+
+        if (heartbeat.isPrimary()) {
+            clusterCoordinator.setPrimaryNode(nodeId);
+        }
+    }
+
+
+    /**
+     * @return the most recent heartbeat information for each node in the cluster
+     */
+    protected abstract Map<NodeIdentifier, NodeHeartbeat> getLatestHeartbeats();
+
+    /**
+     * This method does nothing in the abstract class but is meant for subclasses to
+     * override in order to provide functionality when the monitor is started.
+     */
+    protected void onStart() {
+    }
+
+    /**
+     * This method does nothing in the abstract class but is meant for subclasses to
+     * override in order to provide functionality when the monitor is stopped.
+     */
+    protected void onStop() {
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/fb7b3fe4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/ClusterProtocolHeartbeatMonitor.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/ClusterProtocolHeartbeatMonitor.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/ClusterProtocolHeartbeatMonitor.java
new file mode 100644
index 0000000..d9ef0be
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/ClusterProtocolHeartbeatMonitor.java
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.cluster.coordination.heartbeat;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.nio.charset.StandardCharsets;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import javax.xml.bind.JAXBContext;
+import javax.xml.bind.Unmarshaller;
+
+import org.apache.curator.RetryPolicy;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.RetryForever;
+import org.apache.nifi.cluster.HeartbeatPayload;
+import org.apache.nifi.cluster.coordination.ClusterCoordinator;
+import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus;
+import org.apache.nifi.cluster.protocol.Heartbeat;
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
+import org.apache.nifi.cluster.protocol.ProtocolException;
+import org.apache.nifi.cluster.protocol.ProtocolHandler;
+import org.apache.nifi.cluster.protocol.message.HeartbeatMessage;
+import org.apache.nifi.cluster.protocol.message.ProtocolMessage;
+import org.apache.nifi.cluster.protocol.message.ProtocolMessage.MessageType;
+import org.apache.nifi.controller.cluster.ZooKeeperClientConfig;
+import org.apache.nifi.reporting.Severity;
+import org.apache.nifi.util.NiFiProperties;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException.NoNodeException;
+import org.apache.zookeeper.KeeperException.NodeExistsException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Uses Apache Curator to monitor heartbeats from nodes
+ */
+public class ClusterProtocolHeartbeatMonitor extends AbstractHeartbeatMonitor implements HeartbeatMonitor, ProtocolHandler {
+    protected static final Logger logger = LoggerFactory.getLogger(ClusterProtocolHeartbeatMonitor.class);
+    private static final String COORDINATOR_ZNODE_NAME = "coordinator";
+
+    private final ZooKeeperClientConfig zkClientConfig;
+    private final String clusterNodesPath;
+
+    private volatile CuratorFramework curatorClient;
+    private volatile Map<String, NodeIdentifier> clusterNodeIds = new HashMap<>();
+
+    private final String heartbeatAddress;
+    private final ConcurrentMap<NodeIdentifier, NodeHeartbeat> heartbeatMessages = new ConcurrentHashMap<>();
+
+    protected static final Unmarshaller nodeIdentifierUnmarshaller;
+
+    static {
+        try {
+            final JAXBContext jaxbContext = JAXBContext.newInstance(NodeIdentifier.class);
+            nodeIdentifierUnmarshaller = jaxbContext.createUnmarshaller();
+        } catch (final Exception e) {
+            throw new RuntimeException("Failed to create an Unmarshaller for unmarshalling Node Identifier", e);
+        }
+    }
+
+
+    public ClusterProtocolHeartbeatMonitor(final ClusterCoordinator clusterCoordinator, final Properties properties) {
+        super(clusterCoordinator, properties);
+
+        this.zkClientConfig = ZooKeeperClientConfig.createConfig(properties);
+        this.clusterNodesPath = zkClientConfig.resolvePath("cluster/nodes");
+
+        String hostname = properties.getProperty(NiFiProperties.CLUSTER_MANAGER_ADDRESS);
+        if (hostname == null) {
+            try {
+                hostname = InetAddress.getLocalHost().getHostName();
+            } catch (UnknownHostException e) {
+                throw new RuntimeException("Unable to determine local hostname and the '" + NiFiProperties.CLUSTER_MANAGER_ADDRESS + "' property is not set");
+            }
+        }
+
+        final String port = properties.getProperty(NiFiProperties.CLUSTER_MANAGER_PROTOCOL_PORT);
+        if (port == null) {
+            throw new RuntimeException("Unable to determine which port Cluster Manager Protocol is listening on because the '"
+                + NiFiProperties.CLUSTER_MANAGER_PROTOCOL_PORT + "' property is not set");
+        }
+
+        try {
+            Integer.parseInt(port);
+        } catch (final NumberFormatException nfe) {
+            throw new RuntimeException("Unable to determine which port Cluster Manager Protocol is listening on because the '"
+                + NiFiProperties.CLUSTER_MANAGER_PROTOCOL_PORT + "' property is set to '" + port + "', which is not a valid port number.");
+        }
+
+        heartbeatAddress = hostname + ":" + port;
+    }
+
+    @Override
+    public void onStart() {
+        final RetryPolicy retryPolicy = new RetryForever(5000);
+        curatorClient = CuratorFrameworkFactory.newClient(zkClientConfig.getConnectString(),
+            zkClientConfig.getSessionTimeoutMillis(), zkClientConfig.getConnectionTimeoutMillis(), retryPolicy);
+        curatorClient.start();
+
+        final Thread publishAddress = new Thread(new Runnable() {
+            @Override
+            public void run() {
+                while (!isStopped()) {
+                    final String path = clusterNodesPath + "/" + COORDINATOR_ZNODE_NAME;
+                    try {
+                        try {
+                            curatorClient.setData().forPath(path, heartbeatAddress.getBytes(StandardCharsets.UTF_8));
+                            return;
+                        } catch (final NoNodeException nne) {
+                            // ensure that parents are created, using a wide-open ACL because the parents contain no data
+                            // and the path is not in any way sensitive.
+                            try {
+                                curatorClient.create().creatingParentContainersIfNeeded().forPath(path);
+                            } catch (final NodeExistsException nee) {
+                                // This is okay. Node already exists.
+                            }
+
+                            curatorClient.create().withMode(CreateMode.EPHEMERAL).forPath(path, heartbeatAddress.getBytes(StandardCharsets.UTF_8));
+                            logger.info("Successfully created node in ZooKeeper with path {}", path);
+
+                            return;
+                        }
+                    } catch (Exception e) {
+                        logger.warn("Failed to update ZooKeeper to notify nodes of the heartbeat address. Will continue to retry.");
+
+                        try {
+                            Thread.sleep(2000L);
+                        } catch (final InterruptedException ie) {
+                            Thread.currentThread().interrupt();
+                            return;
+                        }
+                    }
+                }
+            }
+        });
+
+        publishAddress.setName("Publish Heartbeat Address");
+        publishAddress.setDaemon(true);
+        publishAddress.start();
+    }
+
+    private CuratorFramework getClient() {
+        return curatorClient;
+    }
+
+    @Override
+    public void onStop() {
+        final CuratorFramework client = getClient();
+        if (client != null) {
+            client.close();
+        }
+    }
+
+    @Override
+    protected Map<NodeIdentifier, NodeHeartbeat> getLatestHeartbeats() {
+        return Collections.unmodifiableMap(heartbeatMessages);
+    }
+
+    @Override
+    public synchronized void removeHeartbeat(final NodeIdentifier nodeId) {
+        logger.debug("Deleting heartbeat for node {}", nodeId);
+        final String nodeInfoPath = clusterNodesPath + "/" + nodeId.getId();
+
+        heartbeatMessages.remove(nodeId);
+
+        try {
+            getClient().delete().forPath(nodeInfoPath);
+            logger.info("Removed heartbeat from ZooKeeper for Node {}", nodeId);
+        } catch (final NoNodeException e) {
+            // node did not exist. Just return.
+            logger.debug("Attempted to remove heartbeat for Node with ID {} but no ZNode existed at {}", nodeId, nodeInfoPath);
+            return;
+        } catch (final Exception e) {
+            logger.warn("Failed to remove heartbeat from ZooKeeper for Node {} due to {}", nodeId, e);
+            logger.warn("", e);
+
+            clusterCoordinator.reportEvent(nodeId, Severity.WARNING, "Failed to remove node's heartbeat from ZooKeeper due to " + e);
+        }
+    }
+
+    protected Set<NodeIdentifier> getClusterNodeIds() {
+        return new HashSet<>(clusterNodeIds.values());
+    }
+
+
+    @Override
+    public ProtocolMessage handle(final ProtocolMessage msg) throws ProtocolException {
+        if (msg.getType() != MessageType.HEARTBEAT) {
+            throw new ProtocolException("Cannot handle message of type " + msg.getType());
+        }
+
+        final HeartbeatMessage heartbeatMsg = (HeartbeatMessage) msg;
+        final Heartbeat heartbeat = heartbeatMsg.getHeartbeat();
+
+        final NodeIdentifier nodeId = heartbeat.getNodeIdentifier();
+        final NodeConnectionStatus connectionStatus = heartbeat.getConnectionStatus();
+        final boolean primary = heartbeat.isPrimary();
+        final byte[] payloadBytes = heartbeat.getPayload();
+        final HeartbeatPayload payload = HeartbeatPayload.unmarshal(payloadBytes);
+        final int activeThreadCount = payload.getActiveThreadCount();
+        final int flowFileCount = (int) payload.getTotalFlowFileCount();
+        final long flowFileBytes = payload.getTotalFlowFileBytes();
+        final long systemStartTime = payload.getSystemStartTime();
+
+        final NodeHeartbeat nodeHeartbeat = new StandardNodeHeartbeat(nodeId, System.currentTimeMillis(),
+            connectionStatus, primary, flowFileCount, flowFileBytes, activeThreadCount, systemStartTime);
+        heartbeatMessages.put(heartbeat.getNodeIdentifier(), nodeHeartbeat);
+        logger.debug("Received new heartbeat from {}", nodeId);
+
+        return null;
+    }
+
+    @Override
+    public boolean canHandle(ProtocolMessage msg) {
+        return msg.getType() == MessageType.HEARTBEAT;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/fb7b3fe4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/HeartbeatMonitor.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/HeartbeatMonitor.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/HeartbeatMonitor.java
new file mode 100644
index 0000000..c151382
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/HeartbeatMonitor.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.cluster.coordination.heartbeat;
+
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
+
+/**
+ * A HeartbeatMonitor is responsible for monitoring some remote resource for heartbeats from each
+ * node in a cluster and reacting to those heartbeats (or lack thereof).
+ */
+public interface HeartbeatMonitor {
+
+    /**
+     * Begin monitoring for heartbeats
+     */
+    void start();
+
+    /**
+     * Stop monitoring heartbeats
+     */
+    void stop();
+
+    /**
+     * Returns the latest heartbeat that has been obtained for the node with
+     * the given id
+     *
+     * @param nodeId the id of the node whose heartbeat should be retrieved
+     * @return the latest heartbeat that has been obtained for the node with
+     *         the given id, or <code>null</code> if no heartbeat has been obtained
+     */
+    NodeHeartbeat getLatestHeartbeat(NodeIdentifier nodeId);
+
+    /**
+     * Removes the heartbeat for the given node from the monitor and the
+     * remote location where heartbeats are sent
+     *
+     * @param nodeId the id of the node whose heartbeat should be removed
+     */
+    void removeHeartbeat(NodeIdentifier nodeId);
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/fb7b3fe4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/NodeHeartbeat.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/NodeHeartbeat.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/NodeHeartbeat.java
new file mode 100644
index 0000000..bd66022
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/NodeHeartbeat.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.cluster.coordination.heartbeat;
+
+import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus;
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
+
+public interface NodeHeartbeat {
+    /**
+     * @return the time at which the node reported the heartbeat, according
+     *         to the system that received the heartbeat
+     */
+    long getTimestamp();
+
+    /**
+     * @return the identifier of the node that sent the heartbeat
+     */
+    NodeIdentifier getNodeIdentifier();
+
+    /**
+     * @return the Connection Status reported by the node
+     */
+    NodeConnectionStatus getConnectionStatus();
+
+    /**
+     * @return <code>true</code> if the node is the Primary Node in the cluster, <code>false</code> otherwise
+     */
+    boolean isPrimary();
+
+    /**
+     * @return the number of FlowFiles that are queued up on the node
+     */
+    int getFlowFileCount();
+
+    /**
+     * @return the total size of all FlowFiles that are queued up on the node
+     */
+    long getFlowFileBytes();
+
+    /**
+     * @return the number of threads that are actively running in Processors and Reporting Tasks on the node
+     */
+    int getActiveThreadCount();
+
+    /**
+     * @return the time that the node reports having started NiFi
+     */
+    long getSystemStartTime();
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/fb7b3fe4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/StandardNodeHeartbeat.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/StandardNodeHeartbeat.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/StandardNodeHeartbeat.java
new file mode 100644
index 0000000..133bab0
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/StandardNodeHeartbeat.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.cluster.coordination.heartbeat;
+
+import org.apache.nifi.cluster.HeartbeatPayload;
+import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus;
+import org.apache.nifi.cluster.protocol.Heartbeat;
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
+import org.apache.nifi.cluster.protocol.message.HeartbeatMessage;
+
+public class StandardNodeHeartbeat implements NodeHeartbeat {
+
+    private final NodeIdentifier nodeId;
+    private final long timestamp;
+    private final NodeConnectionStatus connectionStatus;
+    private final boolean primary;
+    private final int flowFileCount;
+    private final long flowFileBytes;
+    private final int activeThreadCount;
+    private final long systemStartTime;
+
+    public StandardNodeHeartbeat(final NodeIdentifier nodeId, final long timestamp, final NodeConnectionStatus connectionStatus,
+        final boolean primary, final int flowFileCount, final long flowFileBytes, final int activeThreadCount, final long systemStartTime) {
+        this.timestamp = timestamp;
+        this.nodeId = nodeId;
+        this.connectionStatus = connectionStatus;
+        this.primary = primary;
+        this.flowFileCount = flowFileCount;
+        this.flowFileBytes = flowFileBytes;
+        this.activeThreadCount = activeThreadCount;
+        this.systemStartTime = systemStartTime;
+    }
+
+    @Override
+    public NodeIdentifier getNodeIdentifier() {
+        return nodeId;
+    }
+
+    @Override
+    public long getTimestamp() {
+        return timestamp;
+    }
+
+    @Override
+    public NodeConnectionStatus getConnectionStatus() {
+        return connectionStatus;
+    }
+
+    @Override
+    public boolean isPrimary() {
+        return primary;
+    }
+
+    @Override
+    public int getFlowFileCount() {
+        return flowFileCount;
+    }
+
+    @Override
+    public long getFlowFileBytes() {
+        return flowFileBytes;
+    }
+
+    @Override
+    public int getActiveThreadCount() {
+        return activeThreadCount;
+    }
+
+
+    @Override
+    public long getSystemStartTime() {
+        return systemStartTime;
+    }
+
+    public static StandardNodeHeartbeat fromHeartbeatMessage(final HeartbeatMessage message, final long timestamp) {
+        final Heartbeat heartbeat = message.getHeartbeat();
+        final HeartbeatPayload payload = HeartbeatPayload.unmarshal(heartbeat.getPayload());
+
+        return new StandardNodeHeartbeat(heartbeat.getNodeIdentifier(), timestamp, heartbeat.getConnectionStatus(),
+            heartbeat.isPrimary(), (int) payload.getTotalFlowFileCount(), payload.getTotalFlowFileBytes(),
+            payload.getActiveThreadCount(), payload.getSystemStartTime());
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/fb7b3fe4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/ClusterNode.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/ClusterNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/ClusterNode.java
new file mode 100644
index 0000000..d04d144
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/ClusterNode.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.cluster.coordination.node;
+
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
+
+public class ClusterNode {
+    private final NodeIdentifier nodeId;
+    private NodeConnectionStatus connectionStatus = new NodeConnectionStatus(NodeConnectionState.DISCONNECTED, DisconnectionCode.NOT_YET_CONNECTED);
+
+
+    public ClusterNode(final NodeIdentifier nodeId) {
+        this.nodeId = nodeId;
+    }
+
+    public NodeIdentifier getIdentifier() {
+        return nodeId;
+    }
+
+    public NodeConnectionStatus getConnectionStatus() {
+        return connectionStatus;
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/fb7b3fe4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ClusterManager.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ClusterManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ClusterManager.java
index 27ada88..de3c23e 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ClusterManager.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ClusterManager.java
@@ -29,7 +29,6 @@ import org.apache.nifi.cluster.node.Node;
 import org.apache.nifi.cluster.node.Node.Status;
 import org.apache.nifi.cluster.protocol.ConnectionRequest;
 import org.apache.nifi.cluster.protocol.ConnectionResponse;
-import org.apache.nifi.cluster.protocol.Heartbeat;
 import org.apache.nifi.cluster.protocol.NodeIdentifier;
 import org.apache.nifi.remote.cluster.NodeInformant;
 import org.apache.nifi.reporting.BulletinRepository;
@@ -49,14 +48,6 @@ import org.apache.nifi.reporting.BulletinRepository;
 public interface ClusterManager extends NodeInformant {
 
     /**
-     * Handles a node's heartbeat.
-     *
-     * @param heartbeat a heartbeat
-     *
-     */
-    void handleHeartbeat(Heartbeat heartbeat);
-
-    /**
      * @param statuses the statuses of the nodes
      * @return the set of nodes
      */

http://git-wip-us.apache.org/repos/asf/nifi/blob/fb7b3fe4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
index aefd307..d4ea1d4 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
@@ -33,16 +33,12 @@ import java.util.List;
 import java.util.ListIterator;
 import java.util.Map;
 import java.util.NavigableSet;
-import java.util.Queue;
 import java.util.Set;
-import java.util.Timer;
-import java.util.TimerTask;
 import java.util.TreeMap;
 import java.util.TreeSet;
 import java.util.UUID;
 import java.util.concurrent.CompletionService;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutorCompletionService;
 import java.util.concurrent.ExecutorService;
@@ -71,9 +67,11 @@ import org.apache.nifi.admin.service.AuditService;
 import org.apache.nifi.annotation.lifecycle.OnAdded;
 import org.apache.nifi.annotation.lifecycle.OnConfigurationRestored;
 import org.apache.nifi.annotation.lifecycle.OnRemoved;
-import org.apache.nifi.cluster.HeartbeatPayload;
 import org.apache.nifi.cluster.context.ClusterContext;
 import org.apache.nifi.cluster.context.ClusterContextImpl;
+import org.apache.nifi.cluster.coordination.heartbeat.ClusterProtocolHeartbeatMonitor;
+import org.apache.nifi.cluster.coordination.heartbeat.NodeHeartbeat;
+import org.apache.nifi.cluster.coordination.node.DisconnectionCode;
 import org.apache.nifi.cluster.event.Event;
 import org.apache.nifi.cluster.event.EventManager;
 import org.apache.nifi.cluster.firewall.ClusterNodeFirewall;
@@ -104,7 +102,6 @@ import org.apache.nifi.cluster.node.Node;
 import org.apache.nifi.cluster.node.Node.Status;
 import org.apache.nifi.cluster.protocol.ConnectionRequest;
 import org.apache.nifi.cluster.protocol.ConnectionResponse;
-import org.apache.nifi.cluster.protocol.Heartbeat;
 import org.apache.nifi.cluster.protocol.NodeIdentifier;
 import org.apache.nifi.cluster.protocol.ProtocolException;
 import org.apache.nifi.cluster.protocol.ProtocolHandler;
@@ -113,12 +110,9 @@ import org.apache.nifi.cluster.protocol.impl.ClusterManagerProtocolSenderListene
 import org.apache.nifi.cluster.protocol.impl.ClusterServicesBroadcaster;
 import org.apache.nifi.cluster.protocol.message.ConnectionRequestMessage;
 import org.apache.nifi.cluster.protocol.message.ConnectionResponseMessage;
-import org.apache.nifi.cluster.protocol.message.ControllerStartupFailureMessage;
 import org.apache.nifi.cluster.protocol.message.DisconnectMessage;
-import org.apache.nifi.cluster.protocol.message.HeartbeatMessage;
 import org.apache.nifi.cluster.protocol.message.ProtocolMessage;
 import org.apache.nifi.cluster.protocol.message.ProtocolMessage.MessageType;
-import org.apache.nifi.cluster.protocol.message.ReconnectionFailureMessage;
 import org.apache.nifi.cluster.protocol.message.ReconnectionRequestMessage;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.state.StateManagerProvider;
@@ -290,7 +284,6 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
     public static final String BULLETIN_CATEGORY = "Clustering";
 
     private static final Logger logger = new NiFiLog(LoggerFactory.getLogger(WebClusterManager.class));
-    private static final Logger heartbeatLogger = new NiFiLog(LoggerFactory.getLogger("org.apache.nifi.cluster.heartbeat"));
 
     /**
      * The HTTP header to store a cluster context. An example of what may be stored in the context is a node's auditable actions in response to a cluster request. The cluster context is serialized
@@ -385,10 +378,11 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
     private final ClusterManagerProtocolSenderListener senderListener;
     private final OptimisticLockingManager optimisticLockingManager;
     private final StringEncryptor encryptor;
-    private final Queue<Heartbeat> pendingHeartbeats = new ConcurrentLinkedQueue<>();
     private final ReentrantReadWriteLock resourceRWLock = new ReentrantReadWriteLock(true);
     private final ClusterManagerLock readLock = new ClusterManagerLock(resourceRWLock.readLock(), "Read");
     private final ClusterManagerLock writeLock = new ClusterManagerLock(resourceRWLock.writeLock(), "Write");
+    private final ClusterProtocolHeartbeatMonitor heartbeatMonitor;
+    private final WebClusterManagerCoordinator clusterCoordinator;
 
     private final Set<Node> nodes = new HashSet<>();
     private final ConcurrentMap<String, ReportingTaskNode> reportingTasks = new ConcurrentHashMap<>();
@@ -396,8 +390,6 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
     // null means the dataflow should be read from disk
     private StandardDataFlow cachedDataFlow = null;
     private NodeIdentifier primaryNodeId = null;
-    private Timer heartbeatMonitor;
-    private Timer heartbeatProcessor;
     private volatile ClusterServicesBroadcaster servicesBroadcaster = null;
     private volatile EventManager eventManager = null;
     private volatile ClusterNodeFirewall clusterFirewall = null;
@@ -492,6 +484,10 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
         processScheduler.setMaxThreadCount(SchedulingStrategy.CRON_DRIVEN, 10);
 
         controllerServiceProvider = new StandardControllerServiceProvider(processScheduler, bulletinRepository, stateManagerProvider);
+
+        clusterCoordinator = new WebClusterManagerCoordinator(this, senderListener);
+        heartbeatMonitor = new ClusterProtocolHeartbeatMonitor(clusterCoordinator, properties);
+        senderListener.addHandler(heartbeatMonitor);
     }
 
     public void start() throws IOException {
@@ -502,13 +498,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
             }
 
             try {
-                // setup heartbeat monitoring
-                heartbeatMonitor = new Timer("Heartbeat Monitor", /* is daemon */ true);
-                heartbeatMonitor.schedule(new HeartbeatMonitoringTimerTask(), 0, getHeartbeatMonitoringIntervalSeconds() * 1000);
-
-                heartbeatProcessor = new Timer("Process Pending Heartbeats", true);
-                final int processPendingHeartbeatDelay = 1000 * Math.max(1, getClusterProtocolHeartbeatSeconds() / 2);
-                heartbeatProcessor.schedule(new ProcessPendingHeartbeatsTask(), processPendingHeartbeatDelay, processPendingHeartbeatDelay);
+                heartbeatMonitor.start();
 
                 // start request replication service
                 httpRequestReplicator.start();
@@ -572,16 +562,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
 
             boolean encounteredException = false;
 
-            // stop the heartbeat monitoring
-            if (isHeartbeatMonitorRunning()) {
-                heartbeatMonitor.cancel();
-                heartbeatMonitor = null;
-            }
-
-            if (heartbeatProcessor != null) {
-                heartbeatProcessor.cancel();
-                heartbeatProcessor = null;
-            }
+            heartbeatMonitor.stop();
 
             // stop the HTTP request replicator service
             if (httpRequestReplicator.isRunning()) {
@@ -628,8 +609,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
     public boolean isRunning() {
         readLock.lock();
         try {
-            return isHeartbeatMonitorRunning()
-                    || httpRequestReplicator.isRunning()
+            return httpRequestReplicator.isRunning()
                     || senderListener.isRunning()
                     || dataFlowManagementService.isRunning()
                     || isBroadcasting();
@@ -640,10 +620,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
 
     @Override
     public boolean canHandle(ProtocolMessage msg) {
-        return MessageType.CONNECTION_REQUEST == msg.getType()
-                || MessageType.HEARTBEAT == msg.getType()
-                || MessageType.CONTROLLER_STARTUP_FAILURE == msg.getType()
-                || MessageType.RECONNECTION_FAILURE == msg.getType();
+        return MessageType.CONNECTION_REQUEST == msg.getType();
     }
 
     @Override
@@ -651,31 +628,6 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
         switch (protocolMessage.getType()) {
             case CONNECTION_REQUEST:
                 return handleConnectionRequest((ConnectionRequestMessage) protocolMessage);
-            case HEARTBEAT:
-                final HeartbeatMessage heartbeatMessage = (HeartbeatMessage) protocolMessage;
-
-                final Heartbeat original = heartbeatMessage.getHeartbeat();
-                final NodeIdentifier originalNodeId = original.getNodeIdentifier();
-                final Heartbeat heartbeatWithDn = new Heartbeat(addRequestorDn(originalNodeId, heartbeatMessage.getRequestorDN()), original.isPrimary(), original.isConnected(), original.getPayload());
-
-                handleHeartbeat(heartbeatWithDn);
-                return null;
-            case CONTROLLER_STARTUP_FAILURE:
-                new Thread(new Runnable() {
-                    @Override
-                    public void run() {
-                        handleControllerStartupFailure((ControllerStartupFailureMessage) protocolMessage);
-                    }
-                }, "Handle Controller Startup Failure Message from " + ((ControllerStartupFailureMessage) protocolMessage).getNodeId()).start();
-                return null;
-            case RECONNECTION_FAILURE:
-                new Thread(new Runnable() {
-                    @Override
-                    public void run() {
-                        handleReconnectionFailure((ReconnectionFailureMessage) protocolMessage);
-                    }
-                }, "Handle Reconnection Failure Message from " + ((ReconnectionFailureMessage) protocolMessage).getNodeId()).start();
-                return null;
             default:
                 throw new ProtocolException("No handler defined for message type: " + protocolMessage.getType());
         }
@@ -758,16 +710,13 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
                 addEvent(node.getNodeId(), "Connection requested from new node.  Setting status to connecting.");
                 nodes.add(node);
             } else {
-                node.setStatus(Status.CONNECTING);
+                clusterCoordinator.updateNodeStatus(node, Status.CONNECTING);
                 addEvent(resolvedNodeIdentifier, "Connection requested from existing node.  Setting status to connecting");
             }
 
             // record the time of the connection request
             node.setConnectionRequestedTimestamp(new Date().getTime());
 
-            // clear out old heartbeat info
-            node.setHeartbeat(null);
-
             // try to obtain a current flow
             if (dataFlowManagementService.isFlowCurrent()) {
                 // if a cached copy does not exist, load it from disk
@@ -777,17 +726,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
                     primaryNodeId = clusterDataFlow.getPrimaryNodeId();
                 }
 
-                // determine if this node should be assigned the primary role
-                final boolean primaryRole;
-                if (primaryNodeId == null || primaryNodeId.logicallyEquals(node.getNodeId())) {
-                    setPrimaryNodeId(node.getNodeId());
-                    addEvent(node.getNodeId(), "Setting primary role in connection response.");
-                    primaryRole = true;
-                } else {
-                    primaryRole = false;
-                }
-
-                return new ConnectionResponse(node.getNodeId(), cachedDataFlow, primaryRole, remoteInputPort, remoteCommsSecure, instanceId);
+                return new ConnectionResponse(node.getNodeId(), cachedDataFlow, remoteInputPort, remoteCommsSecure, instanceId);
             }
 
             /*
@@ -848,9 +787,6 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
                 throw new IllegalNodeReconnectionException("Node must be disconnected before it can reconnect.");
             }
 
-            // clear out old heartbeat info
-            node.setHeartbeat(null);
-
             // get the dataflow to send with the reconnection request
             if (!dataFlowManagementService.isFlowCurrent()) {
                 /* node remains disconnected */
@@ -867,7 +803,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
                 primaryNodeId = clusterDataFlow.getPrimaryNodeId();
             }
 
-            node.setStatus(Status.CONNECTING);
+            clusterCoordinator.updateNodeStatus(node, Status.CONNECTING);
             addEvent(node.getNodeId(), "Reconnection requested for node.  Setting status to connecting.");
 
             // determine if this node should be assigned the primary role
@@ -889,7 +825,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
         } catch (final Exception ex) {
             logger.warn("Problem encountered issuing reconnection request to node " + node.getNodeId() + " due to: " + ex, ex);
 
-            node.setStatus(Status.DISCONNECTED);
+            clusterCoordinator.updateNodeStatus(node, Status.DISCONNECTED);
             final String eventMsg = "Problem encountered issuing reconnection request. Node will remain disconnected: " + ex;
             addEvent(node.getNodeId(), eventMsg);
             addBulletin(node, Severity.WARNING, eventMsg);
@@ -1214,7 +1150,8 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
             if (node == null) {
                 throw new UnknownNodeException("Node does not exist.");
             }
-            requestDisconnection(node.getNodeId(), /* ignore last node */ false, "User " + userDn + " Disconnected Node");
+
+            clusterCoordinator.requestNodeDisconnect(node.getNodeId(), DisconnectionCode.USER_DISCONNECTED, "User " + userDn + " Disconnected Node");
         } finally {
             writeLock.unlock("requestDisconnection(String)");
         }
@@ -1246,7 +1183,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
      * true.
      * @throws NodeDisconnectionException if the disconnection message fails to be sent.
      */
-    private void requestDisconnection(final NodeIdentifier nodeId, final boolean ignoreNodeChecks, final String explanation)
+    void requestDisconnection(final NodeIdentifier nodeId, final boolean ignoreNodeChecks, final String explanation)
             throws IllegalNodeDisconnectionException, NodeDisconnectionException {
 
         writeLock.lock();
@@ -1277,14 +1214,11 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
                 // cannot disconnect the last connected node in the cluster
                 if (connectedNodes.size() == 1 && connectedNodes.iterator().next().equals(nodeId)) {
                     throw new IllegalNodeDisconnectionException("Node may not be disconnected because it is the only connected node in the cluster.");
-                } else if (isPrimaryNode(nodeId)) {
-                    // cannot disconnect the primary node in the cluster
-                    throw new IllegalNodeDisconnectionException("Node may not be disconnected because it is the primary node in the cluster.");
                 }
             }
 
             // update status
-            node.setStatus(Status.DISCONNECTED);
+            clusterCoordinator.updateNodeStatus(node, Status.DISCONNECTED);
             notifyDataFlowManagementServiceOfNodeStatusChange();
 
             // issue the disconnection
@@ -1296,6 +1230,8 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
             senderListener.disconnect(request);
             addEvent(nodeId, "Node disconnected due to " + explanation);
             addBulletin(node, Severity.INFO, "Node disconnected due to " + explanation);
+
+            heartbeatMonitor.removeHeartbeat(nodeId);
         } finally {
             writeLock.unlock("requestDisconnection(NodeIdentifier, boolean)");
         }
@@ -1318,36 +1254,6 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
         return responseMessage;
     }
 
-    private void handleControllerStartupFailure(final ControllerStartupFailureMessage msg) {
-        writeLock.lock();
-        try {
-            final Node node = getRawNode(msg.getNodeId().getId());
-            if (node != null) {
-                node.setStatus(Status.DISCONNECTED);
-                addEvent(msg.getNodeId(), "Node could not join cluster because it failed to start up properly. Setting node to Disconnected. Node reported "
-                        + "the following error: " + msg.getExceptionMessage());
-                addBulletin(node, Severity.ERROR, "Node could not join cluster because it failed to start up properly. Setting node to Disconnected. Node "
-                        + "reported the following error: " + msg.getExceptionMessage());
-            }
-        } finally {
-            writeLock.unlock("handleControllerStartupFailure");
-        }
-    }
-
-    private void handleReconnectionFailure(final ReconnectionFailureMessage msg) {
-        writeLock.lock();
-        try {
-            final Node node = getRawNode(msg.getNodeId().getId());
-            if (node != null) {
-                node.setStatus(Status.DISCONNECTED);
-                final String errorMsg = "Node could not rejoin cluster. Setting node to Disconnected. Node reported the following error: " + msg.getExceptionMessage();
-                addEvent(msg.getNodeId(), errorMsg);
-                addBulletin(node, Severity.ERROR, errorMsg);
-            }
-        } finally {
-            writeLock.unlock("handleControllerStartupFailure");
-        }
-    }
 
     @Override
     public ControllerServiceNode createControllerService(final String type, final String id, final boolean firstTimeAdded) {
@@ -1623,176 +1529,6 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
     }
 
 
-    /**
-     * Handles a node's heartbeat. If this heartbeat is a node's first heartbeat since its connection request, then the manager will mark the node as connected. If the node was previously disconnected
-     * due to a lack of heartbeat, then a reconnection request is issued. If the node was disconnected for other reasons, then a disconnection request is issued. If this instance is configured with a
-     * firewall and the heartbeat is blocked, then a disconnection request is issued.
-     */
-    @Override
-    public void handleHeartbeat(final Heartbeat heartbeat) {
-        // sanity check heartbeat
-        if (heartbeat == null) {
-            throw new IllegalArgumentException("Heartbeat may not be null.");
-        } else if (heartbeat.getNodeIdentifier() == null) {
-            throw new IllegalArgumentException("Heartbeat does not contain a node ID.");
-        }
-
-        /*
-         * Processing a heartbeat requires a write lock, which may take a while
-         * to obtain.  Only the last heartbeat is necessary to process per node.
-         * Futhermore, since many could pile up, heartbeats are processed in
-         * bulk.
-         * The below queue stores the pending heartbeats.
-         */
-        pendingHeartbeats.add(heartbeat);
-    }
-
-    private void processPendingHeartbeats() {
-        Node node;
-
-        writeLock.lock();
-        try {
-            /*
-             * Get the most recent heartbeats for the nodes in the cluster.  This
-             * is achieved by "draining" the pending heartbeats queue, populating
-             * a map that associates a node identifier with its latest heartbeat, and
-             * finally, getting the values of the map.
-             */
-            final Map<NodeIdentifier, Heartbeat> mostRecentHeartbeatsMap = new HashMap<>();
-            Heartbeat aHeartbeat;
-            while ((aHeartbeat = pendingHeartbeats.poll()) != null) {
-                mostRecentHeartbeatsMap.put(aHeartbeat.getNodeIdentifier(), aHeartbeat);
-            }
-            final Collection<Heartbeat> mostRecentHeartbeats = new ArrayList<>(mostRecentHeartbeatsMap.values());
-
-            // return fast if no work to do
-            if (mostRecentHeartbeats.isEmpty()) {
-                return;
-            }
-
-            logNodes("Before Heartbeat Processing", heartbeatLogger);
-
-            final int numPendingHeartbeats = mostRecentHeartbeats.size();
-            if (heartbeatLogger.isDebugEnabled()) {
-                heartbeatLogger.debug(String.format("Handling %s heartbeat%s", numPendingHeartbeats, numPendingHeartbeats > 1 ? "s" : ""));
-            }
-
-            for (final Heartbeat mostRecentHeartbeat : mostRecentHeartbeats) {
-                try {
-                    // resolve the proposed node identifier to valid node identifier
-                    final NodeIdentifier resolvedNodeIdentifier = resolveProposedNodeIdentifier(mostRecentHeartbeat.getNodeIdentifier());
-
-                    // get a raw reference to the node (if it doesn't exist, node will be null)
-                    node = getRawNode(resolvedNodeIdentifier.getId());
-
-                    final boolean heartbeatIndicatesNotYetConnected = !mostRecentHeartbeat.isConnected();
-
-                    if (isBlockedByFirewall(resolvedNodeIdentifier.getSocketAddress())) {
-                        if (node == null) {
-                            logger.info("Firewall blocked heartbeat received from unknown node " + resolvedNodeIdentifier + ".  Issuing disconnection request.");
-                        } else {
-                            // record event
-                            addEvent(resolvedNodeIdentifier, "Firewall blocked received heartbeat.  Issuing disconnection request.");
-                        }
-
-                        // request node to disconnect
-                        requestDisconnectionQuietly(resolvedNodeIdentifier, "Blocked By Firewall");
-
-                    } else if (node == null) {  // unknown node, so issue reconnect request
-                        // create new node and add to node set
-                        final Node newNode = new Node(resolvedNodeIdentifier, Status.DISCONNECTED);
-                        nodes.add(newNode);
-
-                        // record event
-                        addEvent(newNode.getNodeId(), "Received heartbeat from unknown node.  Issuing reconnection request.");
-
-                        // record heartbeat
-                        newNode.setHeartbeat(mostRecentHeartbeat);
-                        requestReconnection(resolvedNodeIdentifier.getId(), "NCM Heartbeat Processing");
-                    } else if (heartbeatIndicatesNotYetConnected) {
-                        if (Status.CONNECTED == node.getStatus()) {
-                            // record event
-                            addEvent(node.getNodeId(), "Received heartbeat from node that thinks it is not yet part of the cluster, though the Manager thought it "
-                                    + "was. Marking as Disconnected and issuing reconnection request.");
-
-                            // record heartbeat
-                            node.setHeartbeat(null);
-                            node.setStatus(Status.DISCONNECTED);
-
-                            requestReconnection(resolvedNodeIdentifier.getId(), "NCM Heartbeat Processing");
-                        }
-                    } else if (Status.DISCONNECTED == node.getStatus()) {
-                        // ignore heartbeats from nodes disconnected by means other than lack of heartbeat, unless it is
-                        // the only node. We allow it if it is the only node because if we have a one-node cluster, then
-                        // we cannot manually reconnect it.
-                        if (node.isHeartbeatDisconnection() || nodes.size() == 1) {
-                            // record event
-                            if (node.isHeartbeatDisconnection()) {
-                                addEvent(resolvedNodeIdentifier, "Received heartbeat from node previously disconnected due to lack of heartbeat.  Issuing reconnection request.");
-                            } else {
-                                addEvent(resolvedNodeIdentifier, "Received heartbeat from node previously disconnected, but it is the only known node, so issuing reconnection request.");
-                            }
-
-                            // record heartbeat
-                            node.setHeartbeat(mostRecentHeartbeat);
-
-                            // request reconnection
-                            requestReconnection(resolvedNodeIdentifier.getId(), "NCM Heartbeat Processing");
-                        } else {
-                            // disconnected nodes should not heartbeat, so we need to issue a disconnection request
-                            heartbeatLogger.info("Ignoring received heartbeat from disconnected node " + resolvedNodeIdentifier + ".  Issuing disconnection request.");
-
-                            // request node to disconnect
-                            requestDisconnectionQuietly(resolvedNodeIdentifier, "Received Heartbeat from Node, but Manager has already marked Node as Disconnected");
-                        }
-
-                    } else if (Status.DISCONNECTING == node.getStatus()) {
-                        /* ignore spurious heartbeat */
-                    } else {  // node is either either connected or connecting
-                        // first heartbeat causes status change from connecting to connected
-                        if (Status.CONNECTING == node.getStatus()) {
-                            if (mostRecentHeartbeat.getCreatedTimestamp() < node.getConnectionRequestedTimestamp()) {
-                                heartbeatLogger.info("Received heartbeat for node " + resolvedNodeIdentifier + " but ignoring because it was generated before the node was last asked to reconnect.");
-                                continue;
-                            }
-
-                            // set status to connected
-                            node.setStatus(Status.CONNECTED);
-
-                            // record event
-                            addEvent(resolvedNodeIdentifier, "Received first heartbeat from connecting node.  Setting node to connected.");
-
-                            // notify service of updated node set
-                            notifyDataFlowManagementServiceOfNodeStatusChange();
-
-                            addBulletin(node, Severity.INFO, "Node Connected");
-                        } else {
-                            heartbeatLogger.info("Received heartbeat for node " + resolvedNodeIdentifier + ".");
-                        }
-
-                        // record heartbeat
-                        node.setHeartbeat(mostRecentHeartbeat);
-
-                        if (mostRecentHeartbeat.isPrimary()) {
-                            setPrimaryNodeId(node.getNodeId());
-                        }
-                    }
-                } catch (final Exception e) {
-                    logger.error("Failed to process heartbeat from {}:{} due to {}",
-                            mostRecentHeartbeat.getNodeIdentifier().getApiAddress(), mostRecentHeartbeat.getNodeIdentifier().getApiPort(), e.toString());
-                    if (logger.isDebugEnabled()) {
-                        logger.error("", e);
-                    }
-                }
-            }
-
-            logNodes("After Heartbeat Processing", heartbeatLogger);
-        } finally {
-            writeLock.unlock("processPendingHeartbeats");
-        }
-    }
-
-
     @Override
     public Set<Node> getNodes(final Status... statuses) {
         final Set<Status> desiredStatusSet = new HashSet<>();
@@ -2125,15 +1861,6 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
         }
     }
 
-    private boolean isPrimaryNode(final NodeIdentifier nodeId) {
-        readLock.lock();
-        try {
-            return primaryNodeId != null && primaryNodeId.equals(nodeId);
-        } finally {
-            readLock.unlock("isPrimaryNode");
-        }
-    }
-
     private boolean isInSafeMode() {
         readLock.lock();
         try {
@@ -2143,7 +1870,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
         }
     }
 
-    private void setPrimaryNodeId(final NodeIdentifier primaryNodeId) throws DaoException {
+    void setPrimaryNodeId(final NodeIdentifier primaryNodeId) throws DaoException {
         writeLock.lock();
         try {
             dataFlowManagementService.updatePrimaryNode(primaryNodeId);
@@ -3321,7 +3048,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
             // create new "updated" node by cloning old node and updating status
             final Node currentNode = getRawNode(nodeResponse.getNodeId().getId());
             final Node updatedNode = currentNode.clone();
-            updatedNode.setStatus(nodeStatus);
+            clusterCoordinator.updateNodeStatus(updatedNode, nodeStatus);
 
             // map updated node to its response
             updatedNodesMap.put(updatedNode, nodeResponse);
@@ -4041,7 +3768,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
                     final Node node = updatedNodeEntry.getKey();
 
                     if (problematicNodeResponses.contains(nodeResponse)) {
-                        node.setStatus(Status.CONNECTED);
+                        clusterCoordinator.updateNodeStatus(node, Status.CONNECTED);
                         problematicNodeResponses.remove(nodeResponse);
                     }
                 }
@@ -4227,7 +3954,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
      *
      * @return false if the IP is listed in the firewall or if the firewall is not configured; true otherwise
      */
-    private boolean isBlockedByFirewall(final String ip) {
+    boolean isBlockedByFirewall(final String ip) {
         if (isFirewallConfigured()) {
             return !clusterFirewall.isPermissible(ip);
         } else {
@@ -4257,7 +3984,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
         }
     }
 
-    private Node getRawNode(final String nodeId) {
+    Node getRawNode(final String nodeId) {
         readLock.lock();
         try {
             for (final Node node : nodes) {
@@ -4320,16 +4047,6 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
         }
     }
 
-
-    private boolean isHeartbeatMonitorRunning() {
-        readLock.lock();
-        try {
-            return heartbeatMonitor != null;
-        } finally {
-            readLock.unlock("isHeartbeatMonitorRunning");
-        }
-    }
-
     private boolean canChangeNodeState(final String method, final URI uri) {
         return HttpMethod.DELETE.equalsIgnoreCase(method) || HttpMethod.POST.equalsIgnoreCase(method) || HttpMethod.PUT.equalsIgnoreCase(method);
     }
@@ -4359,87 +4076,8 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
         }
     }
 
-    private void logNodes(final String header, final Logger logger) {
-        if (logger.isTraceEnabled()) {
-            if (StringUtils.isNotBlank(header)) {
-                logger.trace(header);
-            }
-            for (final Node node : getNodes()) {
-                logger.trace(node.getNodeId() + " : " + node.getStatus());
-            }
-        }
-    }
-
-
-    /**
-     * This timer task simply processes any pending heartbeats. This timer task is not strictly needed, as HeartbeatMonitoringTimerTask will do this. However, this task is scheduled much more
-     * frequently and by processing the heartbeats more frequently, the stats that we report have less of a delay.
-     */
-    private class ProcessPendingHeartbeatsTask extends TimerTask {
-
-        @Override
-        public void run() {
-            writeLock.lock();
-            try {
-                processPendingHeartbeats();
-            } finally {
-                writeLock.unlock("Process Pending Heartbeats Task");
-            }
-        }
-    }
-
-    /**
-     * A timer task to detect nodes that have not sent a heartbeat in a while. The "problem" nodes are marked as disconnected due to lack of heartbeat by the task. No disconnection request is sent to
-     * the node. This is because either the node is not functioning in which case sending the request is futile or the node is running a bit slow. In the latter case, we'll wait for the next heartbeat
-     * and send a reconnection request when we process the heartbeat in the heartbeatHandler() method.
-     */
-    private class HeartbeatMonitoringTimerTask extends TimerTask {
-
-        @Override
-        public void run() {
-            // keep track of any status changes
-            boolean statusChanged = false;
-
-            writeLock.lock();
-            try {
-                // process all of the heartbeats before we decided to kick anyone out of the cluster.
-                logger.debug("Processing pending heartbeats...");
-                processPendingHeartbeats();
-
-                logger.debug("Executing heartbeat monitoring");
-
-                // check for any nodes that have not heartbeated in a long time
-                for (final Node node : getRawNodes(Status.CONNECTED)) {
-                    // return prematurely if we were interrupted
-                    if (Thread.currentThread().isInterrupted()) {
-                        return;
-                    }
-
-                    // check if we received a recent heartbeat, changing status to disconnected if necessary
-                    final long lastHeardTimestamp = node.getHeartbeat().getCreatedTimestamp();
-                    final int heartbeatGapSeconds = (int) (new Date().getTime() - lastHeardTimestamp) / 1000;
-                    if (heartbeatGapSeconds > getMaxHeartbeatGapSeconds()) {
-                        node.setHeartbeatDisconnection();
-                        addEvent(node.getNodeId(), "Node disconnected due to lack of heartbeat.");
-                        addBulletin(node, Severity.WARNING, "Node disconnected due to lack of heartbeat");
-                        statusChanged = true;
-                    }
-                }
-
-                // if a status change occurred, make the necessary updates
-                if (statusChanged) {
-                    logNodes("Heartbeat Monitoring disconnected node(s)", logger);
-                    // notify service of updated node set
-                    notifyDataFlowManagementServiceOfNodeStatusChange();
-                } else {
-                    logNodes("Heartbeat Monitoring determined all nodes are healthy", logger);
-                }
-            } catch (final Exception ex) {
-                logger.warn("Heartbeat monitor experienced exception while monitoring: " + ex, ex);
-            } finally {
-                writeLock.unlock("HeartbeatMonitoringTimerTask");
-            }
-        }
+    public NodeHeartbeat getLatestHeartbeat(final NodeIdentifier nodeId) {
+        return heartbeatMonitor.getLatestHeartbeat(nodeId);
     }
 
     @Override
@@ -4449,16 +4087,14 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
             final Collection<NodeInformation> nodeInfos = new ArrayList<>();
             for (final Node node : getRawNodes(Status.CONNECTED)) {
                 final NodeIdentifier id = node.getNodeId();
-                final HeartbeatPayload heartbeat = node.getHeartbeatPayload();
-                if (heartbeat == null) {
-                    continue;
-                }
 
                 final Integer siteToSitePort = id.getSiteToSitePort();
                 if (siteToSitePort == null) {
                     continue;
                 }
-                final int flowFileCount = (int) heartbeat.getTotalFlowFileCount();
+
+                final NodeHeartbeat nodeHeartbeat = heartbeatMonitor.getLatestHeartbeat(id);
+                final int flowFileCount = nodeHeartbeat == null ? 0 : nodeHeartbeat.getFlowFileCount();
                 final NodeInformation nodeInfo = new NodeInformation(id.getSiteToSiteAddress(), siteToSitePort, id.getApiPort(),
                     id.isSiteToSiteSecure(), flowFileCount);
                 nodeInfos.add(nodeInfo);
@@ -4626,4 +4262,11 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
     public Set<String> getControllerServiceIdentifiers(final Class<? extends ControllerService> serviceType) {
         return controllerServiceProvider.getControllerServiceIdentifiers(serviceType);
     }
+
+    public void reportEvent(final NodeIdentifier nodeId, final Severity severity, final String message) {
+        bulletinRepository.addBulletin(BulletinFactory.createBulletin(nodeId == null ? "Cluster" : nodeId.getId(), severity.name(), message));
+        if (nodeId != null) {
+            addEvent(nodeId, message);
+        }
+    }
 }


[2/4] nifi git commit: NIFI-1678: Started refactoring heartbeating mechanism, using a new package: org.apache.nifi.cluster.coordination

Posted by mc...@apache.org.
http://git-wip-us.apache.org/repos/asf/nifi/blob/fb7b3fe4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManagerCoordinator.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManagerCoordinator.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManagerCoordinator.java
new file mode 100644
index 0000000..fc75601
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManagerCoordinator.java
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.cluster.manager.impl;
+
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
+
+import org.apache.nifi.cluster.coordination.ClusterCoordinator;
+import org.apache.nifi.cluster.coordination.node.DisconnectionCode;
+import org.apache.nifi.cluster.coordination.node.NodeConnectionState;
+import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus;
+import org.apache.nifi.cluster.node.Node;
+import org.apache.nifi.cluster.node.Node.Status;
+import org.apache.nifi.cluster.protocol.ClusterManagerProtocolSender;
+import org.apache.nifi.cluster.protocol.ConnectionRequest;
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
+import org.apache.nifi.cluster.protocol.message.NodeStatusChangeMessage;
+import org.apache.nifi.reporting.Severity;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class WebClusterManagerCoordinator implements ClusterCoordinator {
+    private static final Logger logger = LoggerFactory.getLogger(WebClusterManagerCoordinator.class);
+    private static final AtomicLong nodeStatusIdGenerator = new AtomicLong(0L);
+
+    private final WebClusterManager clusterManager;
+    private final ClusterManagerProtocolSender protocolSender;
+
+    public WebClusterManagerCoordinator(final WebClusterManager clusterManager, final ClusterManagerProtocolSender protocolSender) {
+        this.clusterManager = clusterManager;
+        this.protocolSender = protocolSender;
+    }
+
+    @Override
+    public void requestNodeConnect(final NodeIdentifier nodeId) {
+        final Node node = clusterManager.getRawNode(nodeId.getId());
+
+        if (node == null) {
+            final ConnectionRequest connectionRequest = new ConnectionRequest(nodeId);
+            clusterManager.requestConnection(connectionRequest);
+        } else {
+            updateNodeStatus(nodeId, new NodeConnectionStatus(DisconnectionCode.NOT_YET_CONNECTED, "Requesting that Node Connect to the Cluster"));
+            clusterManager.requestReconnection(nodeId.getId(), "Anonymous");
+        }
+    }
+
+    @Override
+    public void finishNodeConnection(final NodeIdentifier nodeId) {
+        final boolean updated = updateNodeStatus(nodeId, new NodeConnectionStatus(NodeConnectionState.CONNECTED));
+        if (!updated) {
+            logger.error("Attempting to Finish Node Connection but could not find Node with Identifier {}", nodeId);
+        }
+    }
+
+    @Override
+    public void requestNodeDisconnect(final NodeIdentifier nodeId, final DisconnectionCode disconnectionCode, final String explanation) {
+        try {
+            clusterManager.requestDisconnection(nodeId, false, explanation);
+
+            if (disconnectionCode == DisconnectionCode.LACK_OF_HEARTBEAT) {
+                final Node node = clusterManager.getRawNode(nodeId.getId());
+                if (node != null) {
+                    updateNodeStatus(node, Status.DISCONNECTED, true);
+                }
+            }
+        } catch (final Exception e) {
+            logger.error("Failed to request node {} disconnect from cluster due to {}", nodeId, e);
+            logger.error("", e);
+        }
+    }
+
+    @Override
+    public void disconnectionRequestedByNode(final NodeIdentifier nodeId, final DisconnectionCode disconnectionCode, final String explanation) {
+        updateNodeStatus(nodeId, new NodeConnectionStatus(disconnectionCode, explanation));
+
+        final Severity severity;
+        switch (disconnectionCode) {
+            case STARTUP_FAILURE:
+            case MISMATCHED_FLOWS:
+            case UNKNOWN:
+                severity = Severity.ERROR;
+                break;
+            default:
+                severity = Severity.INFO;
+                break;
+        }
+
+        reportEvent(nodeId, severity, "Node disconnected from cluster due to " + explanation);
+    }
+
+    @Override
+    public NodeConnectionStatus getConnectionStatus(final NodeIdentifier nodeId) {
+        final Node node = clusterManager.getNode(nodeId.getId());
+        if (node == null) {
+            return null;
+        }
+
+        final Status status = node.getStatus();
+        final NodeConnectionState connectionState = NodeConnectionState.valueOf(status.name());
+        return new NodeConnectionStatus(connectionState, node.getConnectionRequestedTimestamp());
+    }
+
+    @Override
+    public Set<NodeIdentifier> getNodeIdentifiers(final NodeConnectionState state) {
+        final Status status = Status.valueOf(state.name());
+        final Set<Node> nodes = clusterManager.getNodes(status);
+        return nodes.stream()
+            .map(node -> node.getNodeId())
+            .collect(Collectors.toSet());
+    }
+
+    @Override
+    public boolean isBlockedByFirewall(final String hostname) {
+        return clusterManager.isBlockedByFirewall(hostname);
+    }
+
+    @Override
+    public void reportEvent(final NodeIdentifier nodeId, final Severity severity, final String event) {
+        final String messagePrefix = nodeId == null ? "" : nodeId.getApiAddress() + ":" + nodeId.getApiPort() + " -- ";
+        switch (severity) {
+            case INFO:
+                logger.info(messagePrefix + event);
+                break;
+            case WARNING:
+                logger.warn(messagePrefix + event);
+                break;
+            case ERROR:
+                logger.error(messagePrefix + event);
+                break;
+        }
+
+        clusterManager.reportEvent(nodeId, severity, messagePrefix + event);
+    }
+
+    @Override
+    public void setPrimaryNode(final NodeIdentifier nodeId) {
+        clusterManager.setPrimaryNodeId(nodeId);
+    }
+
+    @Override
+    public NodeIdentifier getNodeIdentifier(final String uuid) {
+        final Node node = clusterManager.getNode(uuid);
+        return node == null ? null : node.getNodeId();
+    }
+
+
+    /**
+     * Updates the status of the node with the given ID to the given status and returns <code>true</code>
+     * if successful, <code>false</code> if no node exists with the given ID
+     *
+     * @param nodeId the ID of the node whose status is changed
+     * @param status the new status of the node
+     * @return <code>true</code> if the node exists and is updated, <code>false</code> if the node does not exist
+     */
+    private boolean updateNodeStatus(final NodeIdentifier nodeId, final NodeConnectionStatus status) {
+        final long statusUpdateId = nodeStatusIdGenerator.incrementAndGet();
+
+        final Node node = clusterManager.getRawNode(nodeId.getId());
+        if (node == null) {
+            return false;
+        }
+
+        final Status nodeStatus = Status.valueOf(status.getState().name());
+        final Status oldStatus = node.setStatus(nodeStatus);
+
+        if (nodeStatus != oldStatus) {
+            final Set<NodeIdentifier> nodesToNotify = clusterManager.getNodes(Status.CONNECTED, Status.CONNECTING).stream()
+                .map(toNotify -> toNotify.getNodeId())
+                .collect(Collectors.toSet());
+
+            final NodeStatusChangeMessage message = new NodeStatusChangeMessage();
+            message.setNodeId(nodeId);
+            message.setNodeConnectionStatus(status);
+            // TODO: When this is sent from one node to another, we need to ensure that we check the current
+            // 'revision number' on the node and include that as the Update ID because we need a way to indicate
+            // which status change event occurred first. I.e., when the status of a node is updated on any node
+            // that is not the elected leader, we need to ensure that our nodeStatusIdGenerator also is updated.
+            message.setStatusUpdateIdentifier(statusUpdateId);
+
+            protocolSender.notifyNodeStatusChange(nodesToNotify, message);
+        }
+
+        return true;
+    }
+
+    /**
+     * Updates the status of the given node to the given new status. This method exists only because the NCM currently handles
+     * some of the status changing and we want it to call into this coordinator instead to change the status.
+     *
+     * @param rawNode the node whose status should be updated
+     * @param nodeStatus the new status of the node
+     */
+    void updateNodeStatus(final Node rawNode, final Status nodeStatus) {
+        // TODO: Remove this method when NCM is removed
+        updateNodeStatus(rawNode, nodeStatus, false);
+    }
+
+
+    /**
+     * Updates the status of the given node to the given new status. This method exists only because the NCM currently handles
+     * some of the status changing and we want it to call into this coordinator instead to change the status.
+     *
+     * @param rawNode the node whose status should be updated
+     * @param nodeStatus the new status of the node
+     * @param heartbeatDisconnect indicates whether or not the node is being disconnected due to lack of heartbeat
+     */
+    void updateNodeStatus(final Node rawNode, final Status nodeStatus, final boolean heartbeatDisconnect) {
+        // TODO: Remove this method when NCM is removed.
+        final long statusUpdateId = nodeStatusIdGenerator.incrementAndGet();
+        final Status oldStatus;
+        if (heartbeatDisconnect) {
+            oldStatus = rawNode.setHeartbeatDisconnection();
+        } else {
+            oldStatus = rawNode.setStatus(nodeStatus);
+        }
+
+        if (nodeStatus != oldStatus) {
+            final Set<NodeIdentifier> nodesToNotify = clusterManager.getNodes(Status.CONNECTED, Status.CONNECTING).stream()
+                .map(toNotify -> toNotify.getNodeId())
+                .collect(Collectors.toSet());
+
+            final NodeStatusChangeMessage message = new NodeStatusChangeMessage();
+            message.setNodeId(rawNode.getNodeId());
+            message.setNodeConnectionStatus(new NodeConnectionStatus(NodeConnectionState.valueOf(nodeStatus.name())));
+            message.setStatusUpdateIdentifier(statusUpdateId);
+
+            protocolSender.notifyNodeStatusChange(nodesToNotify, message);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/fb7b3fe4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/node/Node.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/node/Node.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/node/Node.java
index 3bb3c1a..6f71834 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/node/Node.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/node/Node.java
@@ -18,12 +18,7 @@ package org.apache.nifi.cluster.node;
 
 import java.util.concurrent.atomic.AtomicLong;
 
-import org.apache.nifi.cluster.HeartbeatPayload;
-import org.apache.nifi.cluster.protocol.Heartbeat;
 import org.apache.nifi.cluster.protocol.NodeIdentifier;
-import org.apache.nifi.cluster.protocol.ProtocolException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  * Represents a connected flow controller. Nodes always have an immutable identifier and a status. The status may be changed, but never null.
@@ -35,8 +30,6 @@ import org.slf4j.LoggerFactory;
  */
 public class Node implements Cloneable, Comparable<Node> {
 
-    private static final Logger lockLogger = LoggerFactory.getLogger("cluster.lock");
-
     /**
      * The semantics of a Node status are as follows:
      * <ul>
@@ -66,15 +59,6 @@ public class Node implements Cloneable, Comparable<Node> {
      */
     private Status status;
 
-    /**
-     * the last heartbeat received by from the node
-     */
-    private Heartbeat lastHeartbeat;
-
-    /**
-     * the payload of the last heartbeat received from the node
-     */
-    private HeartbeatPayload lastHeartbeatPayload;
 
     /**
      * the last time the connection for this node was requested
@@ -101,40 +85,6 @@ public class Node implements Cloneable, Comparable<Node> {
     }
 
     /**
-     * Returns the last received heartbeat or null if no heartbeat has been set.
-     *
-     * @return a heartbeat or null
-     */
-    public Heartbeat getHeartbeat() {
-        return lastHeartbeat;
-    }
-
-    public HeartbeatPayload getHeartbeatPayload() {
-        return lastHeartbeatPayload;
-    }
-
-    /**
-     * Sets the last heartbeat received.
-     *
-     * @param heartbeat a heartbeat
-     *
-     * @throws ProtocolException if the heartbeat's payload failed unmarshalling
-     */
-    public void setHeartbeat(final Heartbeat heartbeat) throws ProtocolException {
-        this.lastHeartbeat = heartbeat;
-        if (this.lastHeartbeat == null) {
-            this.lastHeartbeatPayload = null;
-        } else {
-            final byte[] payload = lastHeartbeat.getPayload();
-            if (payload == null || payload.length == 0) {
-                this.lastHeartbeatPayload = null;
-            } else {
-                this.lastHeartbeatPayload = HeartbeatPayload.unmarshal(payload);
-            }
-        }
-    }
-
-    /**
      * Returns the time of the last received connection request for this node.
      *
      * @return the time when the connection request for this node was received.
@@ -166,34 +116,38 @@ public class Node implements Cloneable, Comparable<Node> {
     /**
      * Sets the status to disconnected and flags the node as being disconnected by lack of heartbeat.
      */
-    public void setHeartbeatDisconnection() {
-        setStatus(Status.DISCONNECTED);
+    public Status setHeartbeatDisconnection() {
+        final Status oldStatus = setStatus(Status.DISCONNECTED);
         heartbeatDisconnection = true;
+        return oldStatus;
     }
 
     /**
      * @return the status
      */
-    public Status getStatus() {
+    public synchronized Status getStatus() {
         return status;
     }
 
     /**
+     * Updates the status to the given value, returning the previous status
+     *
      * @param status a status
+     * @return the previous status for the node
      */
-    public void setStatus(final Status status) {
+    public synchronized Status setStatus(final Status status) {
         if (status == null) {
             throw new IllegalArgumentException("Status may not be null.");
         }
+        final Status oldStatus = this.status;
         this.status = status;
         heartbeatDisconnection = false;
+        return oldStatus;
     }
 
     @Override
     public Node clone() {
         final Node clone = new Node(nodeId, status);
-        clone.lastHeartbeat = lastHeartbeat;
-        clone.lastHeartbeatPayload = lastHeartbeatPayload;
         clone.heartbeatDisconnection = heartbeatDisconnection;
         clone.connectionRequestedTimestamp = connectionRequestedTimestamp;
         return clone;

http://git-wip-us.apache.org/repos/asf/nifi/blob/fb7b3fe4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/heartbeat/TestAbstractHeartbeatMonitor.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/heartbeat/TestAbstractHeartbeatMonitor.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/heartbeat/TestAbstractHeartbeatMonitor.java
new file mode 100644
index 0000000..e218e05
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/heartbeat/TestAbstractHeartbeatMonitor.java
@@ -0,0 +1,350 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.cluster.coordination.heartbeat;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+import org.apache.curator.test.TestingServer;
+import org.apache.nifi.cluster.coordination.ClusterCoordinator;
+import org.apache.nifi.cluster.coordination.node.DisconnectionCode;
+import org.apache.nifi.cluster.coordination.node.NodeConnectionState;
+import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus;
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
+import org.apache.nifi.reporting.Severity;
+import org.apache.nifi.util.NiFiProperties;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestAbstractHeartbeatMonitor {
+    private TestingServer zkServer;
+    private NodeIdentifier nodeId;
+    private TestFriendlyHeartbeatMonitor monitor;
+
+    @Before
+    public void setup() throws Exception {
+        zkServer = new TestingServer(true);
+        zkServer.start();
+        nodeId = new NodeIdentifier(UUID.randomUUID().toString(), "localhost", 9999, "localhost", 8888, "localhost", null, false);
+    }
+
+    @After
+    public void clear() throws IOException {
+        if (zkServer != null) {
+            zkServer.stop();
+            zkServer.close();
+        }
+
+        if (monitor != null) {
+            monitor.stop();
+        }
+    }
+
+    /**
+     * Verifies that a node that sends a heartbeat that indicates that it is 'connected' is asked to connect to
+     * cluster if the cluster coordinator does not know about the node
+     *
+     * @throws InterruptedException if interrupted
+     */
+    @Test
+    public void testNewConnectedHeartbeatFromUnknownNode() throws IOException, InterruptedException {
+        final List<NodeIdentifier> requestedToConnect = Collections.synchronizedList(new ArrayList<>());
+        final ClusterCoordinatorAdapter coordinator = new ClusterCoordinatorAdapter() {
+            @Override
+            public synchronized void requestNodeConnect(final NodeIdentifier nodeId) {
+                requestedToConnect.add(nodeId);
+            }
+        };
+
+        final TestFriendlyHeartbeatMonitor monitor = createMonitor(coordinator);
+
+        // Ensure that we request the Unknown Node connect to the cluster
+        final NodeHeartbeat heartbeat = createHeartbeat(nodeId, NodeConnectionState.CONNECTED);
+        monitor.addHeartbeat(heartbeat);
+        monitor.waitForProcessed();
+
+        assertEquals(1, requestedToConnect.size());
+        assertEquals(nodeId, requestedToConnect.get(0));
+        assertEquals(1, coordinator.getEvents().size());
+    }
+
+    /**
+     * Verifies that a node that sends a heartbeat that indicates that it is 'connected' if previously
+     * manually disconnected, will be asked to disconnect from the cluster again.
+     *
+     * @throws InterruptedException if interrupted
+     */
+    @Test
+    public void testHeartbeatFromManuallyDisconnectedNode() throws InterruptedException {
+        final Set<NodeIdentifier> requestedToConnect = Collections.synchronizedSet(new HashSet<>());
+        final Set<NodeIdentifier> requestedToDisconnect = Collections.synchronizedSet(new HashSet<>());
+        final ClusterCoordinatorAdapter adapter = new ClusterCoordinatorAdapter() {
+            @Override
+            public synchronized void requestNodeConnect(final NodeIdentifier nodeId) {
+                super.requestNodeConnect(nodeId);
+                requestedToConnect.add(nodeId);
+            }
+
+            @Override
+            public synchronized void requestNodeDisconnect(final NodeIdentifier nodeId, final DisconnectionCode disconnectionCode, final String explanation) {
+                super.requestNodeDisconnect(nodeId, disconnectionCode, explanation);
+                requestedToDisconnect.add(nodeId);
+            }
+        };
+
+        final TestFriendlyHeartbeatMonitor monitor = createMonitor(adapter);
+
+        adapter.requestNodeDisconnect(nodeId, DisconnectionCode.USER_DISCONNECTED, "Unit Testing");
+        monitor.addHeartbeat(createHeartbeat(nodeId, NodeConnectionState.CONNECTED));
+        monitor.waitForProcessed();
+
+        assertEquals(1, requestedToDisconnect.size());
+        assertEquals(nodeId, requestedToDisconnect.iterator().next());
+        assertTrue(requestedToConnect.isEmpty());
+    }
+
+
+    @Test
+    public void testConnectingNodeMarkedConnectedWhenHeartbeatReceived() throws InterruptedException {
+        final Set<NodeIdentifier> requestedToConnect = Collections.synchronizedSet(new HashSet<>());
+        final Set<NodeIdentifier> connected = Collections.synchronizedSet(new HashSet<>());
+        final ClusterCoordinatorAdapter adapter = new ClusterCoordinatorAdapter() {
+            @Override
+            public synchronized void requestNodeConnect(final NodeIdentifier nodeId) {
+                super.requestNodeConnect(nodeId);
+                requestedToConnect.add(nodeId);
+            }
+
+            @Override
+            public synchronized void finishNodeConnection(final NodeIdentifier nodeId) {
+                super.finishNodeConnection(nodeId);
+                connected.add(nodeId);
+            }
+        };
+
+        final TestFriendlyHeartbeatMonitor monitor = createMonitor(adapter);
+
+        adapter.requestNodeConnect(nodeId); // set state to 'connecting'
+        requestedToConnect.clear();
+
+        monitor.addHeartbeat(createHeartbeat(nodeId, NodeConnectionState.CONNECTED));
+        monitor.waitForProcessed();
+
+        assertEquals(1, connected.size());
+        assertEquals(nodeId, connected.iterator().next());
+        assertTrue(requestedToConnect.isEmpty());
+    }
+
+
+    @Test
+    public void testDisconnectedHeartbeatOnStartup() throws InterruptedException {
+        final Set<NodeIdentifier> requestedToConnect = Collections.synchronizedSet(new HashSet<>());
+        final Set<NodeIdentifier> connected = Collections.synchronizedSet(new HashSet<>());
+        final Set<NodeIdentifier> disconnected = Collections.synchronizedSet(new HashSet<>());
+        final ClusterCoordinatorAdapter adapter = new ClusterCoordinatorAdapter() {
+            @Override
+            public synchronized void requestNodeConnect(final NodeIdentifier nodeId) {
+                super.requestNodeConnect(nodeId);
+                requestedToConnect.add(nodeId);
+            }
+
+            @Override
+            public synchronized void finishNodeConnection(final NodeIdentifier nodeId) {
+                super.finishNodeConnection(nodeId);
+                connected.add(nodeId);
+            }
+
+            @Override
+            public synchronized void requestNodeDisconnect(final NodeIdentifier nodeId, final DisconnectionCode disconnectionCode, final String explanation) {
+                super.requestNodeDisconnect(nodeId, disconnectionCode, explanation);
+                disconnected.add(nodeId);
+            }
+        };
+
+        final TestFriendlyHeartbeatMonitor monitor = createMonitor(adapter);
+
+        requestedToConnect.clear();
+
+        monitor.addHeartbeat(createHeartbeat(nodeId, DisconnectionCode.NODE_SHUTDOWN));
+        monitor.waitForProcessed();
+
+        assertTrue(connected.isEmpty());
+        assertTrue(requestedToConnect.isEmpty());
+        assertTrue(disconnected.isEmpty());
+    }
+
+    private NodeHeartbeat createHeartbeat(final NodeIdentifier nodeId, final DisconnectionCode disconnectionCode) {
+        final NodeConnectionStatus status = new NodeConnectionStatus(disconnectionCode);
+        return new StandardNodeHeartbeat(nodeId, System.currentTimeMillis(), status, false, 0, 0, 0, 0);
+    }
+
+    private NodeHeartbeat createHeartbeat(final NodeIdentifier nodeId, final NodeConnectionState state) {
+        final NodeConnectionStatus status = new NodeConnectionStatus(state);
+        return new StandardNodeHeartbeat(nodeId, System.currentTimeMillis(), status, false, 0, 0, 0, 0);
+    }
+
+    private TestFriendlyHeartbeatMonitor createMonitor(final ClusterCoordinator coordinator) {
+        monitor = new TestFriendlyHeartbeatMonitor(coordinator, createProperties());
+        monitor.start();
+        return monitor;
+    }
+
+    private Properties createProperties() {
+        final Properties properties = new Properties();
+        properties.setProperty(NiFiProperties.CLUSTER_PROTOCOL_HEARTBEAT_INTERVAL, "10 ms");
+        properties.setProperty(NiFiProperties.ZOOKEEPER_CONNECT_STRING, zkServer.getConnectString());
+        properties.setProperty(NiFiProperties.ZOOKEEPER_CONNECT_TIMEOUT, "3 secs");
+        properties.setProperty(NiFiProperties.ZOOKEEPER_SESSION_TIMEOUT, "3 secs");
+        properties.setProperty(NiFiProperties.ZOOKEEPER_ROOT_NODE, "/nifi");
+        return properties;
+    }
+
+    private static class ClusterCoordinatorAdapter implements ClusterCoordinator {
+        private final Map<NodeIdentifier, NodeConnectionStatus> statuses = new HashMap<>();
+        private final List<ReportedEvent> events = new ArrayList<>();
+
+        @Override
+        public synchronized void requestNodeConnect(NodeIdentifier nodeId) {
+            statuses.put(nodeId, new NodeConnectionStatus(NodeConnectionState.CONNECTING));
+        }
+
+        @Override
+        public synchronized void finishNodeConnection(NodeIdentifier nodeId) {
+            statuses.put(nodeId, new NodeConnectionStatus(NodeConnectionState.CONNECTED));
+        }
+
+        @Override
+        public synchronized void requestNodeDisconnect(NodeIdentifier nodeId, DisconnectionCode disconnectionCode, String explanation) {
+            statuses.put(nodeId, new NodeConnectionStatus(NodeConnectionState.DISCONNECTED));
+        }
+
+        @Override
+        public synchronized void disconnectionRequestedByNode(NodeIdentifier nodeId, DisconnectionCode disconnectionCode, String explanation) {
+            statuses.put(nodeId, new NodeConnectionStatus(NodeConnectionState.DISCONNECTED));
+        }
+
+        @Override
+        public synchronized NodeConnectionStatus getConnectionStatus(NodeIdentifier nodeId) {
+            return statuses.get(nodeId);
+        }
+
+        @Override
+        public synchronized Set<NodeIdentifier> getNodeIdentifiers(NodeConnectionState state) {
+            return statuses.entrySet().stream().filter(p -> p.getValue().getState() == state).map(p -> p.getKey()).collect(Collectors.toSet());
+        }
+
+        @Override
+        public synchronized boolean isBlockedByFirewall(String hostname) {
+            return false;
+        }
+
+        @Override
+        public synchronized void reportEvent(NodeIdentifier nodeId, Severity severity, String event) {
+            events.add(new ReportedEvent(nodeId, severity, event));
+        }
+
+        @Override
+        public synchronized void setPrimaryNode(NodeIdentifier nodeId) {
+        }
+
+        synchronized List<ReportedEvent> getEvents() {
+            return new ArrayList<>(events);
+        }
+
+        @Override
+        public NodeIdentifier getNodeIdentifier(final String uuid) {
+            return statuses.keySet().stream().filter(p -> p.getId().equals(uuid)).findFirst().orElse(null);
+        }
+    }
+
+    public static class ReportedEvent {
+        private final NodeIdentifier nodeId;
+        private final Severity severity;
+        private final String event;
+
+        public ReportedEvent(NodeIdentifier nodeId, Severity severity, String event) {
+            this.nodeId = nodeId;
+            this.severity = severity;
+            this.event = event;
+        }
+
+        public NodeIdentifier getNodeId() {
+            return nodeId;
+        }
+
+        public Severity getSeverity() {
+            return severity;
+        }
+
+        public String getEvent() {
+            return event;
+        }
+    }
+
+
+    private static class TestFriendlyHeartbeatMonitor extends AbstractHeartbeatMonitor {
+        private Map<NodeIdentifier, NodeHeartbeat> heartbeats = new HashMap<>();
+        private final Object mutex = new Object();
+
+        public TestFriendlyHeartbeatMonitor(ClusterCoordinator clusterCoordinator, Properties properties) {
+            super(clusterCoordinator, properties);
+        }
+
+        @Override
+        protected synchronized Map<NodeIdentifier, NodeHeartbeat> getLatestHeartbeats() {
+            return heartbeats;
+        }
+
+        @Override
+        public synchronized void monitorHeartbeats() {
+            super.monitorHeartbeats();
+
+            synchronized (mutex) {
+                mutex.notify();
+            }
+        }
+
+        synchronized void addHeartbeat(final NodeHeartbeat heartbeat) {
+            heartbeats.put(heartbeat.getNodeIdentifier(), heartbeat);
+        }
+
+        @Override
+        public synchronized void removeHeartbeat(final NodeIdentifier nodeId) {
+            heartbeats.remove(nodeId);
+        }
+
+        void waitForProcessed() throws InterruptedException {
+            synchronized (mutex) {
+                mutex.wait();
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/fb7b3fe4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/cluster/ConnectionException.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/cluster/ConnectionException.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/cluster/ConnectionException.java
index 1d42729..86057ff 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/cluster/ConnectionException.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/cluster/ConnectionException.java
@@ -24,6 +24,10 @@ public class ConnectionException extends RuntimeException {
 
     private static final long serialVersionUID = -1378294897231234028L;
 
+    public ConnectionException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
+        super(message, cause, enableSuppression, writableStackTrace);
+    }
+
     public ConnectionException() {
     }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/fb7b3fe4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
index 75395b7..13a01bb 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
@@ -38,6 +38,7 @@ import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
@@ -61,6 +62,9 @@ import org.apache.nifi.annotation.lifecycle.OnShutdown;
 import org.apache.nifi.annotation.notification.OnPrimaryNodeStateChange;
 import org.apache.nifi.annotation.notification.PrimaryNodeState;
 import org.apache.nifi.cluster.HeartbeatPayload;
+import org.apache.nifi.cluster.coordination.node.DisconnectionCode;
+import org.apache.nifi.cluster.coordination.node.NodeConnectionState;
+import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus;
 import org.apache.nifi.cluster.protocol.DataFlow;
 import org.apache.nifi.cluster.protocol.Heartbeat;
 import org.apache.nifi.cluster.protocol.NodeIdentifier;
@@ -78,6 +82,8 @@ import org.apache.nifi.connectable.Port;
 import org.apache.nifi.connectable.Position;
 import org.apache.nifi.connectable.Size;
 import org.apache.nifi.connectable.StandardConnection;
+import org.apache.nifi.controller.cluster.Heartbeater;
+import org.apache.nifi.controller.cluster.ZooKeeperHeartbeater;
 import org.apache.nifi.controller.exception.CommunicationsException;
 import org.apache.nifi.controller.exception.ComponentLifeCycleException;
 import org.apache.nifi.controller.exception.ProcessorInstantiationException;
@@ -299,10 +305,6 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
      */
     private final StringEncryptor encryptor;
 
-    /**
-     * cluster protocol sender
-     */
-    private final NodeProtocolSender protocolSender;
 
     private final ScheduledExecutorService clusterTaskExecutor = new FlowEngine(3, "Clustering Tasks");
     private final ResourceClaimManager resourceClaimManager = new StandardResourceClaimManager();
@@ -311,14 +313,14 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
     /**
      * timer to periodically send heartbeats to the cluster
      */
-    private ScheduledFuture<?> heartbeatGeneratorFuture;
     private ScheduledFuture<?> heartbeatSenderFuture;
+    private final Heartbeater heartbeater;
 
     // guarded by FlowController lock
     /**
      * timer task to generate heartbeats
      */
-    private final AtomicReference<HeartbeatMessageGeneratorTask> heartbeatMessageGeneratorTaskRef = new AtomicReference<>(null);
+    private final AtomicReference<HeartbeatSendTask> heartbeatSendTask = new AtomicReference<>(null);
 
     // guarded by rwLock
     /**
@@ -334,10 +336,8 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
     private String clusterManagerDN;
 
     // guarded by rwLock
-    /**
-     * true if connected to a cluster
-     */
-    private boolean connected;
+    private NodeConnectionStatus connectionStatus;
+    private final ConcurrentMap<NodeIdentifier, VersionedNodeConnectionStatus> nodeStatuses = new ConcurrentHashMap<>();
 
     // guarded by rwLock
     private String instanceId;
@@ -471,7 +471,6 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
 
         this.configuredForClustering = configuredForClustering;
         this.heartbeatDelaySeconds = (int) FormatUtils.getTimeDuration(properties.getNodeHeartbeatInterval(), TimeUnit.SECONDS);
-        this.protocolSender = protocolSender;
         try {
             this.templateManager = new TemplateManager(properties.getTemplateDirectory());
         } catch (final IOException e) {
@@ -508,9 +507,10 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
         }
 
         // Initialize the Embedded ZooKeeper server, if applicable
-        if (properties.isStartEmbeddedZooKeeper()) {
+        if (properties.isStartEmbeddedZooKeeper() && configuredForClustering) {
             try {
                 zooKeeperStateServer = ZooKeeperStateServer.create(properties);
+                zooKeeperStateServer.start();
             } catch (final IOException | ConfigException e) {
                 throw new IllegalStateException("Unable to initailize Flow because NiFi was configured to start an Embedded Zookeeper server but failed to do so", e);
             }
@@ -526,11 +526,14 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
             }
         }, snapshotMillis, snapshotMillis, TimeUnit.MILLISECONDS);
 
-        heartbeatBeanRef.set(new HeartbeatBean(rootGroup, false, false));
+        heartbeatBeanRef.set(new HeartbeatBean(rootGroup, false, new NodeConnectionStatus(NodeConnectionState.DISCONNECTED, DisconnectionCode.NOT_YET_CONNECTED)));
+
         if (configuredForClustering) {
             leaderElectionManager = new CuratorLeaderElectionManager(4);
+            heartbeater = new ZooKeeperHeartbeater(protocolSender, properties);
         } else {
             leaderElectionManager = null;
+            heartbeater = null;
         }
     }
 
@@ -1003,6 +1006,10 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
         }
     }
 
+    public Heartbeater getHeartbeater() {
+        return heartbeater;
+    }
+
     /**
      * @return the BulletinRepository for storing and retrieving Bulletins
      */
@@ -1164,6 +1171,10 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
                 throw new IllegalStateException("Controller already stopped or still stopping...");
             }
 
+            if (heartbeater != null) {
+                sendShutdownNotification();
+            }
+
             if (leaderElectionManager != null) {
                 leaderElectionManager.stop();
             }
@@ -1253,6 +1264,39 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
     }
 
     /**
+     * Sends a notification to the cluster that the node was shut down.
+     */
+    private void sendShutdownNotification() {
+        // Generate a heartbeat message and publish it, indicating that we are shutting down
+        final HeartbeatMessage heartbeatMsg = createHeartbeatMessage();
+        final Heartbeat heartbeat = heartbeatMsg.getHeartbeat();
+        final byte[] hbPayload = heartbeatMsg.getHeartbeat().getPayload();
+        final NodeConnectionStatus connectionStatus = new NodeConnectionStatus(DisconnectionCode.NODE_SHUTDOWN);
+        heartbeatMsg.setHeartbeat(new Heartbeat(heartbeat.getNodeIdentifier(), false, connectionStatus, hbPayload));
+        final Runnable sendNotification = new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    heartbeater.send(heartbeatMsg);
+                } catch (IOException e) {
+                    LOG.warn("Failed to send NODE_SHUTDOWN heartbeat message. Cluster may not be notified of "
+                        + "shutdown and may have to wait for the heartbeats to time out before noticing that the node left the cluster");
+                }
+            }
+        };
+
+        final Future<?> hbFuture = processScheduler.submitFrameworkTask(sendNotification);
+        try {
+            hbFuture.get(3, TimeUnit.SECONDS);
+            LOG.info("Successfully sent Shutdown Notification to cluster");
+        } catch (final Exception e) {
+            LOG.warn("Failed to send NODE_SHUTDOWN heartbeat message in time. Cluster may not be notified of "
+                + "shutdown and may have to wait for the heartbeats to time out before noticing that the node left the cluster");
+        }
+    }
+
+
+    /**
      * Serializes the current state of the controller to the given OutputStream
      *
      * @param serializer serializer
@@ -1374,7 +1418,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
             }
 
             // update the heartbeat bean
-            this.heartbeatBeanRef.set(new HeartbeatBean(rootGroup, isPrimary(), connected));
+            this.heartbeatBeanRef.set(new HeartbeatBean(rootGroup, isPrimary(), connectionStatus));
         } finally {
             writeLock.unlock();
         }
@@ -2951,14 +2995,11 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
 
         writeLock.lock();
         try {
-
             stopHeartbeating();
 
-            final HeartbeatMessageGeneratorTask heartbeatMessageGeneratorTask = new HeartbeatMessageGeneratorTask();
-            heartbeatMessageGeneratorTaskRef.set(heartbeatMessageGeneratorTask);
-            heartbeatGeneratorFuture = clusterTaskExecutor.scheduleWithFixedDelay(heartbeatMessageGeneratorTask, 0, heartbeatDelaySeconds, TimeUnit.SECONDS);
-
-            heartbeatSenderFuture = clusterTaskExecutor.scheduleWithFixedDelay(new HeartbeatSendTask(protocolSender), 250, 250, TimeUnit.MILLISECONDS);
+            final HeartbeatSendTask sendTask = new HeartbeatSendTask();
+            this.heartbeatSendTask.set(sendTask);
+            heartbeatSenderFuture = clusterTaskExecutor.scheduleWithFixedDelay(sendTask, 0, heartbeatDelaySeconds, TimeUnit.SECONDS);
         } finally {
             writeLock.unlock();
         }
@@ -2986,7 +3027,6 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
      * @throws IllegalStateException if not clustered
      */
     public void stopHeartbeating() throws IllegalStateException {
-
         if (!configuredForClustering) {
             throw new IllegalStateException("Unable to stop heartbeating because heartbeating is not configured.");
         }
@@ -2997,10 +3037,6 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
                 return;
             }
 
-            if (heartbeatGeneratorFuture != null) {
-                heartbeatGeneratorFuture.cancel(false);
-            }
-
             if (heartbeatSenderFuture != null) {
                 heartbeatSenderFuture.cancel(false);
             }
@@ -3016,8 +3052,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
     public boolean isHeartbeating() {
         readLock.lock();
         try {
-            return heartbeatGeneratorFuture != null && !heartbeatGeneratorFuture.isCancelled()
-                && heartbeatSenderFuture != null && !heartbeatSenderFuture.isCancelled();
+            return heartbeatSenderFuture != null && !heartbeatSenderFuture.isCancelled();
         } finally {
             readLock.unlock();
         }
@@ -3112,6 +3147,12 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
             boolean isChanging = false;
             if (this.clustered != clustered) {
                 isChanging = true;
+
+                if (clustered) {
+                    LOG.info("Cluster State changed from Not Clustered to Clustered");
+                } else {
+                    LOG.info("Cluster State changed from Clustered to Not Clustered");
+                }
             }
 
             // mark the new cluster status
@@ -3185,6 +3226,8 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
                         zooKeeperStateServer.shutdown();
                     }
                     stateManagerProvider.disableClusterProvider();
+
+                    setPrimary(false);
                 }
 
                 final List<RemoteProcessGroup> remoteGroups = getGroup(getRootGroupId()).findAllRemoteProcessGroups();
@@ -3194,7 +3237,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
             }
 
             // update the heartbeat bean
-            this.heartbeatBeanRef.set(new HeartbeatBean(rootGroup, isPrimary(), connected));
+            this.heartbeatBeanRef.set(new HeartbeatBean(rootGroup, isPrimary(), connectionStatus));
         } finally {
             writeLock.unlock();
         }
@@ -3230,12 +3273,15 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
         eventDrivenWorkerQueue.setPrimary(primary);
 
         // update the heartbeat bean
-        this.heartbeatBeanRef.set(new HeartbeatBean(rootGroup, primary, connected));
+        final HeartbeatBean oldBean = this.heartbeatBeanRef.getAndSet(new HeartbeatBean(rootGroup, primary, connectionStatus));
 
         // Emit a bulletin detailing the fact that the primary node state has changed
-        final String message = primary ? "This node has been elected Primary Node" : "This node is no longer Primary Node";
-        final Bulletin bulletin = BulletinFactory.createBulletin("Primary Node", Severity.INFO.name(), message);
-        bulletinRepository.addBulletin(bulletin);
+        if (oldBean == null || oldBean.isPrimary() != primary) {
+            final String message = primary ? "This node has been elected Primary Node" : "This node is no longer Primary Node";
+            final Bulletin bulletin = BulletinFactory.createBulletin("Primary Node", Severity.INFO.name(), message);
+            bulletinRepository.addBulletin(bulletin);
+            LOG.info(message);
+        }
     }
 
     static boolean areEqual(final String a, final String b) {
@@ -3602,19 +3648,19 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
     public boolean isConnected() {
         rwLock.readLock().lock();
         try {
-            return connected;
+            return connectionStatus.getState() == NodeConnectionState.CONNECTED;
         } finally {
             rwLock.readLock().unlock();
         }
     }
 
-    public void setConnected(final boolean connected) {
+    public void setConnectionStatus(final NodeConnectionStatus connectionStatus) {
         rwLock.writeLock().lock();
         try {
-            this.connected = connected;
+            this.connectionStatus = connectionStatus;
 
             // update the heartbeat bean
-            this.heartbeatBeanRef.set(new HeartbeatBean(rootGroup, isPrimary(), connected));
+            this.heartbeatBeanRef.set(new HeartbeatBean(rootGroup, isPrimary(), connectionStatus));
         } finally {
             rwLock.writeLock().unlock();
         }
@@ -3628,25 +3674,16 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
             return;
         }
 
-        final HeartbeatMessageGeneratorTask task = heartbeatMessageGeneratorTaskRef.get();
+        final HeartbeatSendTask task = heartbeatSendTask.get();
         if (task != null) {
-            task.run();
+            clusterTaskExecutor.submit(task);
         }
     }
 
 
     private class HeartbeatSendTask implements Runnable {
-
-        private final NodeProtocolSender protocolSender;
         private final DateFormat dateFormatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss,SSS", Locale.US);
 
-        public HeartbeatSendTask(final NodeProtocolSender protocolSender) {
-            if (protocolSender == null) {
-                throw new IllegalArgumentException("NodeProtocolSender may not be null.");
-            }
-            this.protocolSender = protocolSender;
-        }
-
         @Override
         public void run() {
             try {
@@ -3654,19 +3691,15 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
                     return;
                 }
 
-                final HeartbeatMessageGeneratorTask task = heartbeatMessageGeneratorTaskRef.get();
-                if (task == null) {
-                    return;
-                }
-
-                final HeartbeatMessage message = task.getHeartbeatMessage();
+                final HeartbeatMessage message = createHeartbeatMessage();
                 if (message == null) {
                     heartbeatLogger.debug("No heartbeat to send");
                     return;
                 }
 
                 final long sendStart = System.nanoTime();
-                protocolSender.heartbeat(message);
+                heartbeater.send(message);
+
                 final long sendNanos = System.nanoTime() - sendStart;
                 final long sendMillis = TimeUnit.NANOSECONDS.toMillis(sendNanos);
 
@@ -3679,58 +3712,44 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
                     heartbeatLogger.debug(usae.getMessage());
                 }
             } catch (final Throwable ex) {
-                heartbeatLogger.warn("Failed to send heartbeat to cluster manager due to: " + ex);
-                if (heartbeatLogger.isDebugEnabled()) {
-                    heartbeatLogger.warn("", ex);
-                }
+                heartbeatLogger.warn("Failed to send heartbeat due to: " + ex, ex);
             }
         }
     }
 
-    private class HeartbeatMessageGeneratorTask implements Runnable {
-
-        private final AtomicReference<HeartbeatMessage> heartbeatMessageRef = new AtomicReference<>();
-
-        @Override
-        public void run() {
-            final HeartbeatMessage heartbeatMessage = createHeartbeatMessage();
-            if (heartbeatMessage != null) {
-                heartbeatMessageRef.set(heartbeatMessage);
-            }
-        }
-
-        public HeartbeatMessage getHeartbeatMessage() {
-            return heartbeatMessageRef.getAndSet(null);
-        }
-
-        private HeartbeatMessage createHeartbeatMessage() {
-            try {
-                final HeartbeatBean bean = heartbeatBeanRef.get();
-                if (bean == null) {
-                    return null;
+    HeartbeatMessage createHeartbeatMessage() {
+        try {
+            HeartbeatBean bean = heartbeatBeanRef.get();
+            if (bean == null) {
+                readLock.lock();
+                try {
+                    final NodeConnectionStatus connectionStatus = new NodeConnectionStatus(DisconnectionCode.NOT_YET_CONNECTED);
+                    bean = new HeartbeatBean(getGroup(getRootGroupId()), isPrimary(), connectionStatus);
+                } finally {
+                    readLock.unlock();
                 }
+            }
 
-                // create heartbeat payload
-                final HeartbeatPayload hbPayload = new HeartbeatPayload();
-                hbPayload.setSystemStartTime(systemStartTime);
-                hbPayload.setActiveThreadCount(getActiveThreadCount());
+            // create heartbeat payload
+            final HeartbeatPayload hbPayload = new HeartbeatPayload();
+            hbPayload.setSystemStartTime(systemStartTime);
+            hbPayload.setActiveThreadCount(getActiveThreadCount());
 
-                final QueueSize queueSize = getTotalFlowFileCount(bean.getRootGroup());
-                hbPayload.setTotalFlowFileCount(queueSize.getObjectCount());
-                hbPayload.setTotalFlowFileBytes(queueSize.getByteCount());
+            final QueueSize queueSize = getTotalFlowFileCount(bean.getRootGroup());
+            hbPayload.setTotalFlowFileCount(queueSize.getObjectCount());
+            hbPayload.setTotalFlowFileBytes(queueSize.getByteCount());
 
-                // create heartbeat message
-                final Heartbeat heartbeat = new Heartbeat(getNodeId(), bean.isPrimary(), bean.isConnected(), hbPayload.marshal());
-                final HeartbeatMessage message = new HeartbeatMessage();
-                message.setHeartbeat(heartbeat);
+            // create heartbeat message
+            final Heartbeat heartbeat = new Heartbeat(getNodeId(), bean.isPrimary(), bean.getConnectionStatus(), hbPayload.marshal());
+            final HeartbeatMessage message = new HeartbeatMessage();
+            message.setHeartbeat(heartbeat);
 
-                heartbeatLogger.debug("Generated heartbeat");
+            heartbeatLogger.debug("Generated heartbeat");
 
-                return message;
-            } catch (final Throwable ex) {
-                LOG.warn("Failed to create heartbeat due to: " + ex, ex);
-                return null;
-            }
+            return message;
+        } catch (final Throwable ex) {
+            LOG.warn("Failed to create heartbeat due to: " + ex, ex);
+            return null;
         }
     }
 
@@ -3852,16 +3871,51 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
         return queues;
     }
 
+    public void setNodeStatus(final NodeIdentifier nodeId, final NodeConnectionStatus nodeStatus, final Long updateId) {
+        // We keep a VersionedNodeConnectionStatus as the value in our map, rather than NodeConnectionStatus.
+        // We do this because we update this based on data that is coming from the network. It is possible that we will
+        // get these notifications out-of-order because they could be sent across different TCP connections. As a result,
+        // we need to ensure that we don't update the status to an older version. The VersionedNodeConnectionStatus
+        // allows us to do this by looking at an "Update ID" that is associated with the new node status.
+        final VersionedNodeConnectionStatus versionedStatus = new VersionedNodeConnectionStatus(nodeStatus, updateId);
+
+        boolean updated = false;
+        while (!updated) {
+            VersionedNodeConnectionStatus curStatus = nodeStatuses.putIfAbsent(nodeId, versionedStatus);
+            if (curStatus == null) {
+                // There was no status before.
+                LOG.info("Status of Node {} set to {}", nodeId, nodeStatus);
+                return;
+            }
+
+            if (updateId < curStatus.getUpdateId()) {
+                LOG.debug("Received notification that status of Node {} changed to {} but the status update was old. Ignoring update.", nodeId, nodeStatus);
+                return;
+            }
+
+            updated = nodeStatuses.replace(nodeId, curStatus, versionedStatus);
+            if (updated) {
+                LOG.info("Status of {} changed from {} to {}", nodeId, curStatus.getStatus(), nodeStatus);
+                return;
+            }
+        }
+    }
+
+    public NodeConnectionStatus getNodeStatus(final NodeIdentifier nodeId) {
+        final VersionedNodeConnectionStatus versionedStatus = nodeStatuses.get(nodeId);
+        return versionedStatus == null ? null : versionedStatus.getStatus();
+    }
+
     private static class HeartbeatBean {
 
         private final ProcessGroup rootGroup;
         private final boolean primary;
-        private final boolean connected;
+        private final NodeConnectionStatus connectionStatus;
 
-        public HeartbeatBean(final ProcessGroup rootGroup, final boolean primary, final boolean connected) {
+        public HeartbeatBean(final ProcessGroup rootGroup, final boolean primary, final NodeConnectionStatus connectionStatus) {
             this.rootGroup = rootGroup;
             this.primary = primary;
-            this.connected = connected;
+            this.connectionStatus = connectionStatus;
         }
 
         public ProcessGroup getRootGroup() {
@@ -3872,9 +3926,56 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
             return primary;
         }
 
-        public boolean isConnected() {
-            return connected;
+        public NodeConnectionStatus getConnectionStatus() {
+            return connectionStatus;
         }
     }
 
+
+    /**
+     * A simple wrapper around a Node Connection Status and an Update ID. This is used as a value in a map so that we
+     * ensure that we update that Map only with newer versions
+     */
+    private static class VersionedNodeConnectionStatus {
+        private final NodeConnectionStatus status;
+        private final Long updateId;
+
+        public VersionedNodeConnectionStatus(final NodeConnectionStatus status, final Long updateId) {
+            this.status = status;
+            this.updateId = updateId;
+        }
+
+        public NodeConnectionStatus getStatus() {
+            return status;
+        }
+
+        public Long getUpdateId() {
+            return updateId;
+        }
+
+        @Override
+        public int hashCode() {
+            final int prime = 31;
+            int result = 1;
+            result = prime * result + ((status == null) ? 0 : status.hashCode());
+            result = prime * result + ((updateId == null) ? 0 : updateId.hashCode());
+            return result;
+        }
+
+        @Override
+        public boolean equals(Object obj) {
+            if (this == obj) {
+                return true;
+            }
+            if (obj == null) {
+                return false;
+            }
+            if (getClass() != obj.getClass()) {
+                return false;
+            }
+
+            VersionedNodeConnectionStatus other = (VersionedNodeConnectionStatus) obj;
+            return other.getStatus().equals(getStatus()) && other.getUpdateId().equals(getUpdateId());
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/fb7b3fe4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowSynchronizationException.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowSynchronizationException.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowSynchronizationException.java
index 00c6c5d..c648664 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowSynchronizationException.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowSynchronizationException.java
@@ -16,11 +16,13 @@
  */
 package org.apache.nifi.controller;
 
+import org.apache.nifi.cluster.ConnectionException;
+
 /**
  * Represents the exceptional case when a controller managing an existing flow fails to fully load a different flow.
  *
  */
-public class FlowSynchronizationException extends RuntimeException {
+public class FlowSynchronizationException extends ConnectionException {
 
     private static final long serialVersionUID = 109234802938L;
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/fb7b3fe4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java
index 67d0338..45999a3 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java
@@ -41,26 +41,31 @@ import java.util.zip.GZIPOutputStream;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.cluster.ConnectionException;
+import org.apache.nifi.cluster.HeartbeatPayload;
+import org.apache.nifi.cluster.coordination.node.DisconnectionCode;
+import org.apache.nifi.cluster.coordination.node.NodeConnectionState;
+import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus;
 import org.apache.nifi.cluster.protocol.ConnectionRequest;
 import org.apache.nifi.cluster.protocol.ConnectionResponse;
 import org.apache.nifi.cluster.protocol.DataFlow;
+import org.apache.nifi.cluster.protocol.Heartbeat;
 import org.apache.nifi.cluster.protocol.NodeIdentifier;
 import org.apache.nifi.cluster.protocol.ProtocolException;
 import org.apache.nifi.cluster.protocol.ProtocolHandler;
 import org.apache.nifi.cluster.protocol.StandardDataFlow;
-import org.apache.nifi.cluster.protocol.UnknownServiceAddressException;
 import org.apache.nifi.cluster.protocol.impl.NodeProtocolSenderListener;
 import org.apache.nifi.cluster.protocol.message.ConnectionRequestMessage;
-import org.apache.nifi.cluster.protocol.message.ControllerStartupFailureMessage;
 import org.apache.nifi.cluster.protocol.message.DisconnectMessage;
 import org.apache.nifi.cluster.protocol.message.FlowRequestMessage;
 import org.apache.nifi.cluster.protocol.message.FlowResponseMessage;
+import org.apache.nifi.cluster.protocol.message.HeartbeatMessage;
+import org.apache.nifi.cluster.protocol.message.NodeStatusChangeMessage;
 import org.apache.nifi.cluster.protocol.message.ProtocolMessage;
-import org.apache.nifi.cluster.protocol.message.ReconnectionFailureMessage;
 import org.apache.nifi.cluster.protocol.message.ReconnectionRequestMessage;
 import org.apache.nifi.cluster.protocol.message.ReconnectionResponseMessage;
 import org.apache.nifi.components.state.Scope;
 import org.apache.nifi.components.state.StateManager;
+import org.apache.nifi.controller.cluster.Heartbeater;
 import org.apache.nifi.encrypt.StringEncryptor;
 import org.apache.nifi.engine.FlowEngine;
 import org.apache.nifi.events.BulletinFactory;
@@ -341,6 +346,7 @@ public class StandardFlowService implements FlowService, ProtocolHandler {
             case RECONNECTION_REQUEST:
             case DISCONNECTION_REQUEST:
             case FLOW_REQUEST:
+            case NODE_STATUS_CHANGE:
                 return true;
             default:
                 return false;
@@ -354,6 +360,10 @@ public class StandardFlowService implements FlowService, ProtocolHandler {
             switch (request.getType()) {
                 case FLOW_REQUEST:
                     return handleFlowRequest((FlowRequestMessage) request);
+                case NODE_STATUS_CHANGE:
+                    final NodeStatusChangeMessage statusChangeMsg = (NodeStatusChangeMessage) request;
+                    controller.setNodeStatus(statusChangeMsg.getNodeId(), statusChangeMsg.getNodeConnectionStatus(), statusChangeMsg.getStatusUpdateIdentifier());
+                    return null;
                 case RECONNECTION_REQUEST:
                     // Suspend heartbeats until we've reconnected. Otherwise,
                     // we may send a heartbeat while we are still in the process of
@@ -419,7 +429,7 @@ public class StandardFlowService implements FlowService, ProtocolHandler {
                     // set node as clustered, since it is trying to connect to a cluster
                     controller.setClustered(true, null);
                     controller.setClusterManagerRemoteSiteInfo(null, null);
-                    controller.setConnected(false);
+                    controller.setConnectionStatus(new NodeConnectionStatus(DisconnectionCode.NOT_YET_CONNECTED));
 
                     /*
                      * Start heartbeating.  Heartbeats will fail because we can't reach
@@ -444,23 +454,7 @@ public class StandardFlowService implements FlowService, ProtocolHandler {
                         loadFromConnectionResponse(response);
                     } catch (final ConnectionException ce) {
                         logger.error("Failed to load flow from cluster due to: " + ce, ce);
-
-                        /*
-                         * If we failed processing the response, then we want to notify
-                         * the manager so that it can mark the node as disconnected.
-                         */
-                        // create error message
-                        final ControllerStartupFailureMessage msg = new ControllerStartupFailureMessage();
-                        msg.setExceptionMessage(ce.getMessage());
-                        msg.setNodeId(response.getNodeIdentifier());
-
-                        // send error message to manager
-                        try {
-                            senderListener.notifyControllerStartupFailure(msg);
-                        } catch (final ProtocolException | UnknownServiceAddressException e) {
-                            logger.warn("Failed to notify cluster manager of controller startup failure due to: " + e, e);
-                        }
-
+                        handleConnectionFailure(ce);
                         throw new IOException(ce);
                     }
                 }
@@ -474,6 +468,33 @@ public class StandardFlowService implements FlowService, ProtocolHandler {
         }
     }
 
+    private void handleConnectionFailure(final Exception ex) {
+        final Heartbeater heartbeater = controller.getHeartbeater();
+        if (heartbeater != null) {
+            final HeartbeatMessage startupFailureMessage = new HeartbeatMessage();
+            final NodeConnectionStatus connectionStatus;
+            if (ex instanceof UninheritableFlowException) {
+                connectionStatus = new NodeConnectionStatus(DisconnectionCode.MISMATCHED_FLOWS, ex.toString());
+            } else if (ex instanceof FlowSynchronizationException) {
+                connectionStatus = new NodeConnectionStatus(DisconnectionCode.MISMATCHED_FLOWS, ex.toString());
+            } else {
+                connectionStatus = new NodeConnectionStatus(DisconnectionCode.STARTUP_FAILURE, ex.toString());
+            }
+
+            final byte[] payload;
+            try (final ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
+                HeartbeatPayload.marshal(new HeartbeatPayload(), baos);
+                payload = baos.toByteArray();
+
+                final Heartbeat failureHeartbeat = new Heartbeat(nodeId, false, connectionStatus, payload);
+                startupFailureMessage.setHeartbeat(failureHeartbeat);
+                heartbeater.send(startupFailureMessage);
+            } catch (final Exception e) {
+                logger.error("Failed to notify Cluster Coordinator that Connection failed", e);
+            }
+        }
+    }
+
     private FlowResponseMessage handleFlowRequest(final FlowRequestMessage request) throws ProtocolException {
         readLock.lock();
         try {
@@ -509,7 +530,7 @@ public class StandardFlowService implements FlowService, ProtocolHandler {
             logger.info("Processing reconnection request from manager.");
 
             // reconnect
-            final ConnectionResponse connectionResponse = new ConnectionResponse(nodeId, request.getDataFlow(), request.isPrimary(),
+            final ConnectionResponse connectionResponse = new ConnectionResponse(nodeId, request.getDataFlow(),
                 request.getManagerRemoteSiteListeningPort(), request.isManagerRemoteSiteCommsSecure(), request.getInstanceId());
             connectionResponse.setClusterManagerDN(request.getRequestorDN());
             loadFromConnectionResponse(connectionResponse);
@@ -520,21 +541,11 @@ public class StandardFlowService implements FlowService, ProtocolHandler {
         } catch (final Exception ex) {
             // disconnect controller
             if (controller.isClustered()) {
-                disconnect();
+                disconnect("Failed to properly handle Reconnection request due to " + ex.toString());
             }
 
             logger.error("Handling reconnection request failed due to: " + ex, ex);
-
-            final ReconnectionFailureMessage failureMessage = new ReconnectionFailureMessage();
-            failureMessage.setNodeId(request.getNodeId());
-            failureMessage.setExceptionMessage(ex.toString());
-
-            // send error message to manager
-            try {
-                senderListener.notifyReconnectionFailure(failureMessage);
-            } catch (final ProtocolException | UnknownServiceAddressException e) {
-                logger.warn("Failed to notify cluster manager of controller reconnection failure due to: " + e, e);
-            }
+            handleConnectionFailure(ex);
         } finally {
             writeLock.unlock();
         }
@@ -544,20 +555,20 @@ public class StandardFlowService implements FlowService, ProtocolHandler {
         writeLock.lock();
         try {
             logger.info("Received disconnection request message from manager with explanation: " + request.getExplanation());
-            disconnect();
+            disconnect(request.getExplanation());
         } finally {
             writeLock.unlock();
         }
     }
 
-    private void disconnect() {
+    private void disconnect(final String explanation) {
         writeLock.lock();
         try {
 
             logger.info("Disconnecting node.");
 
             // mark node as not connected
-            controller.setConnected(false);
+            controller.setConnectionStatus(new NodeConnectionStatus(DisconnectionCode.UNKNOWN, explanation));
 
             // turn off primary flag
             controller.setPrimary(false);
@@ -727,7 +738,7 @@ public class StandardFlowService implements FlowService, ProtocolHandler {
             controller.setClustered(true, response.getInstanceId(), response.getClusterManagerDN());
             controller.setClusterManagerRemoteSiteInfo(response.getManagerRemoteInputPort(), response.isManagerRemoteCommsSecure());
 
-            controller.setConnected(true);
+            controller.setConnectionStatus(new NodeConnectionStatus(NodeConnectionState.CONNECTED));
 
             // start the processors as indicated by the dataflow
             controller.onFlowInitialized(dataFlow.isAutoStartProcessors());
@@ -736,11 +747,12 @@ public class StandardFlowService implements FlowService, ProtocolHandler {
             loadSnippets(dataFlow.getSnippets());
             controller.startHeartbeating();
         } catch (final UninheritableFlowException ufe) {
-            throw new ConnectionException(CONNECTION_EXCEPTION_MSG_PREFIX + "local flow is different than cluster flow.", ufe);
+            throw new UninheritableFlowException(CONNECTION_EXCEPTION_MSG_PREFIX + "local flow is different than cluster flow.", ufe);
         } catch (final FlowSerializationException fse) {
             throw new ConnectionException(CONNECTION_EXCEPTION_MSG_PREFIX + "local or cluster flow is malformed.", fse);
         } catch (final FlowSynchronizationException fse) {
-            throw new ConnectionException(CONNECTION_EXCEPTION_MSG_PREFIX + "local flow controller partially updated.  Administrator should disconnect node and review flow for corruption.", fse);
+            throw new FlowSynchronizationException(CONNECTION_EXCEPTION_MSG_PREFIX + "local flow controller partially updated. "
+                + "Administrator should disconnect node and review flow for corruption.", fse);
         } catch (final Exception ex) {
             throw new ConnectionException("Failed to connect node to cluster due to: " + ex, ex);
         } finally {

http://git-wip-us.apache.org/repos/asf/nifi/blob/fb7b3fe4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/UninheritableFlowException.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/UninheritableFlowException.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/UninheritableFlowException.java
index 1b4f9d9..d7fd6d8 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/UninheritableFlowException.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/UninheritableFlowException.java
@@ -16,11 +16,13 @@
  */
 package org.apache.nifi.controller;
 
+import org.apache.nifi.cluster.ConnectionException;
+
 /**
  * Represents the exceptional case when a controller is to be loaded with a flow that is fundamentally different than its existing flow.
  *
  */
-public class UninheritableFlowException extends RuntimeException {
+public class UninheritableFlowException extends ConnectionException {
 
     private static final long serialVersionUID = 198234798234794L;
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/fb7b3fe4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/cluster/Heartbeater.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/cluster/Heartbeater.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/cluster/Heartbeater.java
new file mode 100644
index 0000000..e5379d8
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/cluster/Heartbeater.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.cluster;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+import org.apache.nifi.cluster.protocol.message.HeartbeatMessage;
+
+/**
+ * <p>
+ * A mechanism for sending a heartbeat to a remote resource to indicate
+ * that the node is still an active participant in the cluster
+ * <p>
+ */
+public interface Heartbeater extends Closeable {
+
+    /**
+     * Sends the given heartbeat to the remote resource
+     *
+     * @param heartbeat the Heartbeat to send
+     * @throws IOException if unable to communicate with the remote resource
+     */
+    void send(HeartbeatMessage heartbeat) throws IOException;
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/fb7b3fe4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/cluster/ZooKeeperClientConfig.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/cluster/ZooKeeperClientConfig.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/cluster/ZooKeeperClientConfig.java
new file mode 100644
index 0000000..8571f51
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/cluster/ZooKeeperClientConfig.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.cluster;
+
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.nifi.util.FormatUtils;
+import org.apache.nifi.util.NiFiProperties;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.common.PathUtils;
+import org.apache.zookeeper.data.ACL;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ZooKeeperClientConfig {
+    private static final Logger logger = LoggerFactory.getLogger(ZooKeeperClientConfig.class);
+
+    private final String connectString;
+    private final int sessionTimeoutMillis;
+    private final int connectionTimeoutMillis;
+    private final String rootPath;
+    private final List<ACL> acls;
+
+    private ZooKeeperClientConfig(String connectString, int sessionTimeoutMillis, int connectionTimeoutMillis, String rootPath, List<ACL> acls) {
+        this.connectString = connectString;
+        this.sessionTimeoutMillis = sessionTimeoutMillis;
+        this.connectionTimeoutMillis = connectionTimeoutMillis;
+        this.rootPath = rootPath.endsWith("/") ? rootPath.substring(0, rootPath.length() - 1) : rootPath;
+        this.acls = acls;
+    }
+
+    public String getConnectString() {
+        return connectString;
+    }
+
+    public int getSessionTimeoutMillis() {
+        return sessionTimeoutMillis;
+    }
+
+    public int getConnectionTimeoutMillis() {
+        return connectionTimeoutMillis;
+    }
+
+    public String getRootPath() {
+        return rootPath;
+    }
+
+    public List<ACL> getACLs() {
+        return acls;
+    }
+
+    public String resolvePath(final String path) {
+        if (path.startsWith("/")) {
+            return rootPath + path;
+        }
+
+        return rootPath + "/" + path;
+    }
+
+    public static ZooKeeperClientConfig createConfig(final Properties properties) {
+        final String connectString = properties.getProperty(NiFiProperties.ZOOKEEPER_CONNECT_STRING);
+        if (connectString == null || connectString.trim().isEmpty()) {
+            throw new IllegalStateException("The '" + NiFiProperties.ZOOKEEPER_CONNECT_STRING + "' property is not set in nifi.properties");
+        }
+
+        final long sessionTimeoutMs = getTimePeriod(properties, NiFiProperties.ZOOKEEPER_SESSION_TIMEOUT, NiFiProperties.DEFAULT_ZOOKEEPER_SESSION_TIMEOUT);
+        final long connectionTimeoutMs = getTimePeriod(properties, NiFiProperties.ZOOKEEPER_CONNECT_TIMEOUT, NiFiProperties.DEFAULT_ZOOKEEPER_CONNECT_TIMEOUT);
+        final String rootPath = properties.getProperty(NiFiProperties.ZOOKEEPER_ROOT_NODE, NiFiProperties.DEFAULT_ZOOKEEPER_ROOT_NODE);
+        final String accessControl = properties.getProperty(NiFiProperties.ZOOKEEPER_ACCESS_CONTROL);
+
+        final List<ACL> acls;
+        if (accessControl == null || accessControl.trim().isEmpty()) {
+            acls = null;
+        } else if (accessControl.equalsIgnoreCase("Open")) {
+            acls = Ids.OPEN_ACL_UNSAFE;
+        } else if (accessControl.equalsIgnoreCase("CreatorOnly")) {
+            acls = Ids.CREATOR_ALL_ACL;
+        } else {
+            acls = null;
+        }
+
+        try {
+            PathUtils.validatePath(rootPath);
+        } catch (final IllegalArgumentException e) {
+            throw new IllegalArgumentException("The '" + NiFiProperties.ZOOKEEPER_ROOT_NODE + "' property in nifi.properties is set to an illegal value: " + rootPath);
+        }
+
+        return new ZooKeeperClientConfig(connectString, (int) sessionTimeoutMs, (int) connectionTimeoutMs, rootPath, acls);
+    }
+
+    private static int getTimePeriod(final Properties properties, final String propertyName, final String defaultValue) {
+        final String timeout = properties.getProperty(propertyName, defaultValue);
+        try {
+            return (int) FormatUtils.getTimeDuration(timeout, TimeUnit.MILLISECONDS);
+        } catch (final Exception e) {
+            logger.warn("Value of '" + propertyName + "' property is set to '" + timeout + "', which is not a valid time period. Using default of " + defaultValue);
+            return (int) FormatUtils.getTimeDuration(defaultValue, TimeUnit.MILLISECONDS);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/fb7b3fe4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/cluster/ZooKeeperHeartbeater.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/cluster/ZooKeeperHeartbeater.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/cluster/ZooKeeperHeartbeater.java
new file mode 100644
index 0000000..4348cec
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/cluster/ZooKeeperHeartbeater.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.cluster;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Properties;
+
+import org.apache.curator.RetryPolicy;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.RetryForever;
+import org.apache.nifi.cluster.protocol.NodeProtocolSender;
+import org.apache.nifi.cluster.protocol.ProtocolException;
+import org.apache.nifi.cluster.protocol.message.HeartbeatMessage;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Uses ZooKeeper in order to determine which node is the elected Cluster Coordinator and to indicate
+ * that this node is part of the cluster. However, once the Cluster Coordinator is known, heartbeats are
+ * sent directly to the Cluster Coordinator.
+ */
+public class ZooKeeperHeartbeater implements Heartbeater {
+    private static final Logger logger = LoggerFactory.getLogger(ZooKeeperHeartbeater.class);
+
+    private final NodeProtocolSender protocolSender;
+    private final CuratorFramework curatorClient;
+    private final String nodesPathPrefix;
+
+    private final String coordinatorPath;
+    private volatile String coordinatorAddress;
+
+
+    public ZooKeeperHeartbeater(final NodeProtocolSender protocolSender, final Properties properties) {
+        this.protocolSender = protocolSender;
+
+        final RetryPolicy retryPolicy = new RetryForever(5000);
+        final ZooKeeperClientConfig zkConfig = ZooKeeperClientConfig.createConfig(properties);
+
+        curatorClient = CuratorFrameworkFactory.newClient(zkConfig.getConnectString(),
+            zkConfig.getSessionTimeoutMillis(), zkConfig.getConnectionTimeoutMillis(), retryPolicy);
+
+        curatorClient.start();
+        nodesPathPrefix = zkConfig.resolvePath("cluster/nodes");
+        coordinatorPath = nodesPathPrefix + "/coordinator";
+    }
+
+    private String getHeartbeatAddress() throws IOException {
+        final String curAddress = coordinatorAddress;
+        if (curAddress != null) {
+            return curAddress;
+        }
+
+        try {
+            // Get coordinator address and add watcher to change who we are heartbeating to if the value changes.
+            final byte[] coordinatorAddressBytes = curatorClient.getData().usingWatcher(new Watcher() {
+                @Override
+                public void process(final WatchedEvent event) {
+                    coordinatorAddress = null;
+                }
+            }).forPath(coordinatorPath);
+            final String address = coordinatorAddress = new String(coordinatorAddressBytes, StandardCharsets.UTF_8);
+
+            logger.info("Determined that Cluster Coordinator is located at {}; will use this address for sending heartbeat messages", address);
+            return address;
+        } catch (Exception e) {
+            throw new IOException("Unable to determine Cluster Coordinator from ZooKeeper", e);
+        }
+    }
+
+    @Override
+    public synchronized void send(final HeartbeatMessage heartbeatMessage) throws IOException {
+        final String heartbeatAddress = getHeartbeatAddress();
+
+        try {
+            protocolSender.heartbeat(heartbeatMessage, heartbeatAddress);
+        } catch (final ProtocolException pe) {
+            // a ProtocolException is likely the result of not being able to communicate
+            // with the coordinator. If we do get an IOException communicating with the coordinator,
+            // it will be the cause of the Protocol Exception. In this case, set coordinatorAddress
+            // to null so that we double-check next time that the coordinator has not changed.
+            if (pe.getCause() instanceof IOException) {
+                coordinatorAddress = null;
+            }
+
+            throw pe;
+        }
+    }
+
+
+    @Override
+    public void close() throws IOException {
+        if (curatorClient != null) {
+            curatorClient.close();
+        }
+
+        logger.info("ZooKeeper heartbeater closed. Will no longer send Heartbeat messages to ZooKeeper");
+    }
+}