You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by js...@apache.org on 2018/10/04 20:29:09 UTC

[13/14] nifi git commit: NIFI-5516: Implement Load-Balanced Connections Refactoring StandardFlowFileQueue to have an AbstractFlowFileQueue Refactored more into AbstractFlowFileQueue Added documentation, cleaned up code some Refactored FlowFileQueue so th

http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/SocketProtocolListener.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/SocketProtocolListener.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/SocketProtocolListener.java
index e31a547..9eaffd3 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/SocketProtocolListener.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/SocketProtocolListener.java
@@ -16,15 +16,6 @@
  */
 package org.apache.nifi.cluster.protocol.impl;
 
-import java.io.IOException;
-import java.io.InputStream;
-import java.net.Socket;
-import java.security.cert.CertificateException;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.UUID;
-import java.util.concurrent.CopyOnWriteArrayList;
-
 import org.apache.nifi.cluster.protocol.NodeIdentifier;
 import org.apache.nifi.cluster.protocol.ProtocolContext;
 import org.apache.nifi.cluster.protocol.ProtocolException;
@@ -49,6 +40,22 @@ import org.apache.nifi.util.StopWatch;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.net.ssl.SSLPeerUnverifiedException;
+import javax.net.ssl.SSLSession;
+import javax.net.ssl.SSLSocket;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.Socket;
+import java.security.cert.Certificate;
+import java.security.cert.CertificateException;
+import java.security.cert.X509Certificate;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.stream.Collectors;
+
 /**
  * Implements a listener for protocol messages sent over unicast socket.
  *
@@ -82,7 +89,6 @@ public class SocketProtocolListener extends SocketListener implements ProtocolLi
 
     @Override
     public void start() throws IOException {
-
         if (super.isRunning()) {
             throw new IllegalStateException("Instance is already started.");
         }
@@ -92,7 +98,6 @@ public class SocketProtocolListener extends SocketListener implements ProtocolLi
 
     @Override
     public void stop() throws IOException {
-
         if (super.isRunning() == false) {
             throw new IOException("Instance is already stopped.");
         }
@@ -128,8 +133,6 @@ public class SocketProtocolListener extends SocketListener implements ProtocolLi
             final String requestId = UUID.randomUUID().toString();
             logger.debug("Received request {} from {}", requestId, hostname);
 
-            String requestorDn = getRequestorDN(socket);
-
             // unmarshall message
             final ProtocolMessageUnmarshaller<ProtocolMessage> unmarshaller = protocolContext.createUnmarshaller();
             final ByteCountingInputStream countingIn = new ByteCountingInputStream(socket.getInputStream());
@@ -151,7 +154,7 @@ public class SocketProtocolListener extends SocketListener implements ProtocolLi
                 }
             }
 
-            request.setRequestorDN(requestorDn);
+            final Set<String> nodeIdentities = getCertificateIdentities(socket);
 
             // dispatch message to handler
             ProtocolHandler desiredHandler = null;
@@ -168,7 +171,7 @@ public class SocketProtocolListener extends SocketListener implements ProtocolLi
                 logger.error("Received request of type {} but none of the following Protocol Handlers were able to process the request: {}", request.getType(), handlers);
                 throw new ProtocolException("No handler assigned to handle message type: " + request.getType());
             } else {
-                final ProtocolMessage response = desiredHandler.handle(request);
+                final ProtocolMessage response = desiredHandler.handle(request, nodeIdentities);
                 if (response != null) {
                     try {
                         logger.debug("Sending response for request {}", requestId);
@@ -218,11 +221,32 @@ public class SocketProtocolListener extends SocketListener implements ProtocolLi
         }
     }
 
-    private String getRequestorDN(Socket socket) {
-        try {
-            return CertificateUtils.extractPeerDNFromSSLSocket(socket);
-        } catch (CertificateException e) {
-            throw new ProtocolException(e);
+    private Set<String> getCertificateIdentities(final Socket socket) throws IOException {
+        if (socket instanceof SSLSocket) {
+            try {
+                final SSLSession sslSession = ((SSLSocket) socket).getSession();
+                return getCertificateIdentities(sslSession);
+            } catch (CertificateException e) {
+                throw new IOException("Could not extract Subject Alternative Names from client's certificate", e);
+            }
+        } else {
+            return Collections.emptySet();
+        }
+    }
+
+    private Set<String> getCertificateIdentities(final SSLSession sslSession) throws CertificateException, SSLPeerUnverifiedException {
+        final Certificate[] certs = sslSession.getPeerCertificates();
+        if (certs == null || certs.length == 0) {
+            throw new SSLPeerUnverifiedException("No certificates found");
         }
+
+        final X509Certificate cert = CertificateUtils.convertAbstractX509Certificate(certs[0]);
+        cert.checkValidity();
+
+        final Set<String> identities = CertificateUtils.getSubjectAlternativeNames(cert).stream()
+            .map(CertificateUtils::extractUsername)
+            .collect(Collectors.toSet());
+
+        return identities;
     }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedNodeIdentifier.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedNodeIdentifier.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedNodeIdentifier.java
index a2d9968..dbc988b 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedNodeIdentifier.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedNodeIdentifier.java
@@ -25,6 +25,8 @@ public class AdaptedNodeIdentifier {
     private int apiPort;
     private String socketAddress;
     private int socketPort;
+    private String loadBalanceAddress;
+    private int loadBalancePort;
     private String siteToSiteAddress;
     private Integer siteToSitePort;
     private Integer siteToSiteHttpApiPort;
@@ -74,6 +76,22 @@ public class AdaptedNodeIdentifier {
         this.socketPort = socketPort;
     }
 
+    public String getLoadBalanceAddress() {
+        return loadBalanceAddress;
+    }
+
+    public void setLoadBalanceAddress(final String loadBalanceAddress) {
+        this.loadBalanceAddress = loadBalanceAddress;
+    }
+
+    public int getLoadBalancePort() {
+        return loadBalancePort;
+    }
+
+    public void setLoadBalancePort(final int loadBalancePort) {
+        this.loadBalancePort = loadBalancePort;
+    }
+
     public String getSiteToSiteAddress() {
         return siteToSiteAddress;
     }

http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/NodeIdentifierAdapter.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/NodeIdentifierAdapter.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/NodeIdentifierAdapter.java
index 4a2660f..29aa451 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/NodeIdentifierAdapter.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/NodeIdentifierAdapter.java
@@ -34,6 +34,8 @@ public class NodeIdentifierAdapter extends XmlAdapter<AdaptedNodeIdentifier, Nod
             aNi.setApiPort(ni.getApiPort());
             aNi.setSocketAddress(ni.getSocketAddress());
             aNi.setSocketPort(ni.getSocketPort());
+            aNi.setLoadBalanceAddress(ni.getLoadBalanceAddress());
+            aNi.setLoadBalancePort(ni.getLoadBalancePort());
             aNi.setSiteToSiteAddress(ni.getSiteToSiteAddress());
             aNi.setSiteToSitePort(ni.getSiteToSitePort());
             aNi.setSiteToSiteHttpApiPort(ni.getSiteToSiteHttpApiPort());
@@ -47,7 +49,7 @@ public class NodeIdentifierAdapter extends XmlAdapter<AdaptedNodeIdentifier, Nod
         if (aNi == null) {
             return null;
         } else {
-            return new NodeIdentifier(aNi.getId(), aNi.getApiAddress(), aNi.getApiPort(), aNi.getSocketAddress(), aNi.getSocketPort(),
+            return new NodeIdentifier(aNi.getId(), aNi.getApiAddress(), aNi.getApiPort(), aNi.getSocketAddress(), aNi.getSocketPort(), aNi.getLoadBalanceAddress(), aNi.getLoadBalancePort(),
                 aNi.getSiteToSiteAddress(), aNi.getSiteToSitePort(),aNi.getSiteToSiteHttpApiPort(), aNi.isSiteToSiteSecure());
         }
     }

http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ProtocolMessage.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ProtocolMessage.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ProtocolMessage.java
index 1cab62f..482f5d6 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ProtocolMessage.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ProtocolMessage.java
@@ -18,8 +18,6 @@ package org.apache.nifi.cluster.protocol.message;
 
 public abstract class ProtocolMessage {
 
-    private volatile String requestorDN;
-
     public static enum MessageType {
         CONNECTION_REQUEST,
         CONNECTION_RESPONSE,
@@ -42,21 +40,4 @@ public abstract class ProtocolMessage {
 
     public abstract MessageType getType();
 
-    /**
-     * Sets the DN of the entity making the request
-     *
-     * @param dn dn of the entity making the request
-     */
-    public void setRequestorDN(final String dn) {
-        this.requestorDN = dn;
-    }
-
-    /**
-     * @return the DN of the entity that made the request, if using a secure
-     * socket. Otherwise, returns <code>null</code>
-     */
-    public String getRequestorDN() {
-        return requestorDN;
-    }
-
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/resources/nifi-cluster-protocol-context.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/resources/nifi-cluster-protocol-context.xml b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/resources/nifi-cluster-protocol-context.xml
index 63ab689..e2d5bf2 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/resources/nifi-cluster-protocol-context.xml
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/resources/nifi-cluster-protocol-context.xml
@@ -27,17 +27,17 @@
             <util:constant static-field="org.apache.nifi.cluster.protocol.jaxb.message.JaxbProtocolUtils.JAXB_CONTEXT"/>
         </constructor-arg>
     </bean>
-    
+
     <!-- socket configuration -->
     <bean id="protocolSocketConfiguration" class="org.apache.nifi.cluster.protocol.spring.SocketConfigurationFactoryBean">
         <property name="properties" ref="nifiProperties"/>
     </bean>
-    
+
     <!-- server socket configuration -->
     <bean id="protocolServerSocketConfiguration" class="org.apache.nifi.cluster.protocol.spring.ServerSocketConfigurationFactoryBean">
         <property name="properties" ref="nifiProperties"/>
     </bean>
-    
+
     <!-- cluster manager protocol sender -->
     <bean id="clusterCoordinationProtocolSender" class="org.apache.nifi.cluster.protocol.impl.StandardClusterCoordinationProtocolSender">
         <constructor-arg ref="protocolSocketConfiguration"/>
@@ -49,13 +49,13 @@
             <bean factory-bean="nifiProperties" factory-method="getClusterNodeConnectionTimeout"/>
         </property>
     </bean>
-    
+
     <!-- cluster manager sender/listener -->
     <bean id="clusterCoordinationProtocolSenderListener" class="org.apache.nifi.cluster.protocol.impl.ClusterCoordinationProtocolSenderListener">
         <constructor-arg ref="clusterCoordinationProtocolSender"/>
         <constructor-arg ref="protocolListener"/>
     </bean>
-    
+
     <!-- node protocol sender -->
     <!--
     <bean id="nodeProtocolSender" class="org.apache.nifi.cluster.coordination.node.CuratorNodeProtocolSender">
@@ -69,7 +69,7 @@
         <constructor-arg ref="protocolContext"/>
         <constructor-arg ref="leaderElectionManager"/>
     </bean>
-    
+
     <!-- protocol listener -->
     <bean id="protocolListener" class="org.apache.nifi.cluster.protocol.impl.SocketProtocolListener">
         <constructor-arg index="0">
@@ -81,7 +81,7 @@
         <constructor-arg ref="protocolServerSocketConfiguration" index="2"/>
         <constructor-arg ref="protocolContext" index="3"/>
     </bean>
-    
+
     <!-- node sender/listener -->
     <bean id="nodeProtocolSenderListener" class="org.apache.nifi.cluster.protocol.impl.NodeProtocolSenderListener">
         <constructor-arg ref="nodeProtocolSender"/>

http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/impl/testutils/DelayedProtocolHandler.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/impl/testutils/DelayedProtocolHandler.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/impl/testutils/DelayedProtocolHandler.java
index d6d83ef..aff4b11 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/impl/testutils/DelayedProtocolHandler.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/impl/testutils/DelayedProtocolHandler.java
@@ -16,12 +16,14 @@
  */
 package org.apache.nifi.cluster.protocol.impl.testutils;
 
-import java.util.ArrayList;
-import java.util.List;
 import org.apache.nifi.cluster.protocol.ProtocolException;
 import org.apache.nifi.cluster.protocol.ProtocolHandler;
 import org.apache.nifi.cluster.protocol.message.ProtocolMessage;
 
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+
 /**
  */
 public class DelayedProtocolHandler implements ProtocolHandler {
@@ -34,7 +36,7 @@ public class DelayedProtocolHandler implements ProtocolHandler {
     }
 
     @Override
-    public ProtocolMessage handle(ProtocolMessage msg) throws ProtocolException {
+    public ProtocolMessage handle(ProtocolMessage msg, Set<String> nodeIdentities) throws ProtocolException {
         try {
             messages.add(msg);
             Thread.sleep(delay);

http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/impl/testutils/ReflexiveProtocolHandler.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/impl/testutils/ReflexiveProtocolHandler.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/impl/testutils/ReflexiveProtocolHandler.java
index ccf2c4c..05d5b77 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/impl/testutils/ReflexiveProtocolHandler.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/impl/testutils/ReflexiveProtocolHandler.java
@@ -16,12 +16,14 @@
  */
 package org.apache.nifi.cluster.protocol.impl.testutils;
 
-import java.util.ArrayList;
-import java.util.List;
 import org.apache.nifi.cluster.protocol.ProtocolException;
 import org.apache.nifi.cluster.protocol.ProtocolHandler;
 import org.apache.nifi.cluster.protocol.message.ProtocolMessage;
 
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+
 /**
  */
 public class ReflexiveProtocolHandler implements ProtocolHandler {
@@ -29,7 +31,7 @@ public class ReflexiveProtocolHandler implements ProtocolHandler {
     private List<ProtocolMessage> messages = new ArrayList<>();
 
     @Override
-    public ProtocolMessage handle(ProtocolMessage msg) throws ProtocolException {
+    public ProtocolMessage handle(ProtocolMessage msg, Set<String> nodeIdentities) throws ProtocolException {
         messages.add(msg);
         return msg;
     }

http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/AbstractHeartbeatMonitor.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/AbstractHeartbeatMonitor.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/AbstractHeartbeatMonitor.java
index 35bf510..4395883 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/AbstractHeartbeatMonitor.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/AbstractHeartbeatMonitor.java
@@ -29,6 +29,7 @@ import org.apache.nifi.util.StopWatch;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Collections;
 import java.util.Map;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
@@ -198,7 +199,7 @@ public abstract class AbstractHeartbeatMonitor implements HeartbeatMonitor {
         final NodeIdentifier nodeId = heartbeat.getNodeIdentifier();
 
         // Do not process heartbeat if it's blocked by firewall.
-        if (clusterCoordinator.isBlockedByFirewall(nodeId.getSocketAddress())) {
+        if (clusterCoordinator.isBlockedByFirewall(Collections.singleton(nodeId.getSocketAddress()))) {
             clusterCoordinator.reportEvent(nodeId, Severity.WARNING, "Firewall blocked received heartbeat. Issuing disconnection request.");
 
             // request node to disconnect

http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/ClusterProtocolHeartbeatMonitor.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/ClusterProtocolHeartbeatMonitor.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/ClusterProtocolHeartbeatMonitor.java
index 2d6f023..43f3f2b 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/ClusterProtocolHeartbeatMonitor.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/ClusterProtocolHeartbeatMonitor.java
@@ -16,16 +16,6 @@
  */
 package org.apache.nifi.cluster.coordination.heartbeat;
 
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.function.Function;
-import java.util.stream.Collectors;
 import org.apache.nifi.cluster.coordination.ClusterCoordinator;
 import org.apache.nifi.cluster.coordination.node.NodeConnectionState;
 import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus;
@@ -46,6 +36,17 @@ import org.apache.nifi.util.NiFiProperties;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
 /**
  * Uses Apache ZooKeeper to advertise the address to send heartbeats to, and
  * then relies on the NiFi Cluster Protocol to receive heartbeat messages from
@@ -134,7 +135,7 @@ public class ClusterProtocolHeartbeatMonitor extends AbstractHeartbeatMonitor im
     }
 
     @Override
-    public ProtocolMessage handle(final ProtocolMessage msg) throws ProtocolException {
+    public ProtocolMessage handle(final ProtocolMessage msg, Set<String> nodeIds) throws ProtocolException {
         switch (msg.getType()) {
             case HEARTBEAT:
                 return handleHeartbeat((HeartbeatMessage) msg);

http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java
index 4e4625c..484d155 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java
@@ -16,13 +16,19 @@
  */
 package org.apache.nifi.cluster.coordination.node;
 
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.commons.collections4.queue.CircularFifoQueue;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.cluster.coordination.ClusterCoordinator;
+import org.apache.nifi.cluster.coordination.ClusterTopologyEventListener;
 import org.apache.nifi.cluster.coordination.flow.FlowElection;
 import org.apache.nifi.cluster.coordination.http.HttpResponseMapper;
 import org.apache.nifi.cluster.coordination.http.StandardHttpResponseMapper;
 import org.apache.nifi.cluster.coordination.http.replication.RequestCompletionCallback;
+import org.apache.nifi.cluster.coordination.node.state.NodeIdentifierDescriptor;
 import org.apache.nifi.cluster.event.Event;
 import org.apache.nifi.cluster.event.NodeEvent;
 import org.apache.nifi.cluster.exception.NoClusterCoordinatorException;
@@ -49,8 +55,14 @@ import org.apache.nifi.cluster.protocol.message.NodeStatusChangeMessage;
 import org.apache.nifi.cluster.protocol.message.ProtocolMessage;
 import org.apache.nifi.cluster.protocol.message.ProtocolMessage.MessageType;
 import org.apache.nifi.cluster.protocol.message.ReconnectionRequestMessage;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.components.state.StateManager;
+import org.apache.nifi.components.state.StateManagerProvider;
+import org.apache.nifi.components.state.StateMap;
 import org.apache.nifi.controller.leader.election.LeaderElectionManager;
+import org.apache.nifi.controller.state.manager.StandardStateManagerProvider;
 import org.apache.nifi.events.EventReporter;
+import org.apache.nifi.registry.VariableRegistry;
 import org.apache.nifi.reporting.Severity;
 import org.apache.nifi.services.FlowService;
 import org.apache.nifi.util.NiFiProperties;
@@ -59,6 +71,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.io.StringWriter;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -69,6 +82,7 @@ import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Supplier;
 import java.util.regex.Pattern;
@@ -93,6 +107,7 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
     private final AtomicLong latestUpdateId = new AtomicLong(-1);
     private final FlowElection flowElection;
     private final NodeProtocolSender nodeProtocolSender;
+    private final StateManager stateManager;
 
     private volatile FlowService flowService;
     private volatile boolean connected;
@@ -102,9 +117,18 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
     private final ConcurrentMap<NodeIdentifier, NodeConnectionStatus> nodeStatuses = new ConcurrentHashMap<>();
     private final ConcurrentMap<NodeIdentifier, CircularFifoQueue<NodeEvent>> nodeEvents = new ConcurrentHashMap<>();
 
+    private final List<ClusterTopologyEventListener> eventListeners = new CopyOnWriteArrayList<>();
+
+    public NodeClusterCoordinator(final ClusterCoordinationProtocolSenderListener senderListener, final EventReporter eventReporter, final LeaderElectionManager leaderElectionManager,
+                                  final FlowElection flowElection, final ClusterNodeFirewall firewall, final RevisionManager revisionManager, final NiFiProperties nifiProperties,
+                                  final NodeProtocolSender nodeProtocolSender) throws IOException {
+        this(senderListener, eventReporter, leaderElectionManager, flowElection, firewall, revisionManager, nifiProperties, nodeProtocolSender,
+            StandardStateManagerProvider.create(nifiProperties, VariableRegistry.EMPTY_REGISTRY));
+    }
+
     public NodeClusterCoordinator(final ClusterCoordinationProtocolSenderListener senderListener, final EventReporter eventReporter, final LeaderElectionManager leaderElectionManager,
             final FlowElection flowElection, final ClusterNodeFirewall firewall, final RevisionManager revisionManager, final NiFiProperties nifiProperties,
-            final NodeProtocolSender nodeProtocolSender) {
+            final NodeProtocolSender nodeProtocolSender, final StateManagerProvider stateManagerProvider) throws IOException {
         this.senderListener = senderListener;
         this.flowService = null;
         this.eventReporter = eventReporter;
@@ -114,10 +138,98 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
         this.leaderElectionManager = leaderElectionManager;
         this.flowElection = flowElection;
         this.nodeProtocolSender = nodeProtocolSender;
+        this.stateManager = stateManagerProvider.getStateManager("Cluster Coordinator");
+
+        recoverState();
 
         senderListener.addHandler(this);
     }
 
+    private void recoverState() throws IOException {
+        final StateMap stateMap = stateManager.getState(Scope.LOCAL);
+        if (stateMap == null) {
+            logger.debug("No state to restore");
+            return;
+        }
+
+        final ObjectMapper mapper = new ObjectMapper();
+        final JsonFactory jsonFactory = new JsonFactory();
+        jsonFactory.setCodec(mapper);
+
+        final Map<NodeIdentifier, NodeConnectionStatus> connectionStatusMap = new HashMap<>();
+        NodeIdentifier localNodeId = null;
+
+        final Map<String, String> state = stateMap.toMap();
+        for (final Map.Entry<String, String> entry : state.entrySet()) {
+            final String nodeUuid = entry.getKey();
+            final String nodeIdentifierJson = entry.getValue();
+            logger.debug("Recovering state for {} = {}", nodeUuid, nodeIdentifierJson);
+
+            try (final JsonParser jsonParser = jsonFactory.createParser(nodeIdentifierJson)) {
+                final NodeIdentifierDescriptor nodeIdDesc = jsonParser.readValueAs(NodeIdentifierDescriptor.class);
+                final NodeIdentifier nodeId = nodeIdDesc.toNodeIdentifier();
+
+                connectionStatusMap.put(nodeId, new NodeConnectionStatus(nodeId, DisconnectionCode.NOT_YET_CONNECTED));
+                if (nodeIdDesc.isLocalNodeIdentifier()) {
+                    if (localNodeId == null) {
+                        localNodeId = nodeId;
+                    } else {
+                        logger.warn("When recovering state, determined that tgwo Node Identifiers claim to be the local Node Identifier: {} and {}. Will ignore both of these and wait until " +
+                            "connecting to cluster to determine which Node Identiifer is the local Node Identifier", localNodeId, nodeId);
+                        localNodeId = null;
+                    }
+                }
+            }
+        }
+
+        if (!connectionStatusMap.isEmpty()) {
+            resetNodeStatuses(connectionStatusMap);
+        }
+
+        if (localNodeId != null) {
+            logger.debug("Recovered state indicating that Local Node Identifier is {}", localNodeId);
+            setLocalNodeIdentifier(localNodeId);
+        }
+    }
+
+    private void storeState() {
+        final ObjectMapper mapper = new ObjectMapper();
+        final JsonFactory jsonFactory = new JsonFactory();
+        jsonFactory.setCodec(mapper);
+
+        try {
+            final Map<String, String> stateMap = new HashMap<>();
+
+            final NodeIdentifier localNodeId = getLocalNodeIdentifier();
+            for (final NodeIdentifier nodeId : getNodeIdentifiers()) {
+                final boolean isLocalId = nodeId.equals(localNodeId);
+                final NodeIdentifierDescriptor descriptor = NodeIdentifierDescriptor.fromNodeIdentifier(nodeId, isLocalId);
+
+                try (final StringWriter writer = new StringWriter()) {
+                    final JsonGenerator jsonGenerator = jsonFactory.createGenerator(writer);
+                    jsonGenerator.writeObject(descriptor);
+
+                    final String serializedDescriptor = writer.toString();
+                    stateMap.put(nodeId.getId(), serializedDescriptor);
+                }
+            }
+
+            stateManager.setState(stateMap, Scope.LOCAL);
+            logger.debug("Stored the following state as the Cluster Topology: {}", stateMap);
+        } catch (final Exception e) {
+            logger.warn("Failed to store cluster topology to local State Manager. Upon restart of NiFi, the cluster topology may not be accurate until joining the cluster.", e);
+        }
+    }
+
+
+    public void registerEventListener(final ClusterTopologyEventListener eventListener) {
+        this.eventListeners.add(eventListener);
+    }
+
+    public void unregisterEventListener(final ClusterTopologyEventListener eventListener) {
+        this.eventListeners.remove(eventListener);
+    }
+
     @Override
     public void shutdown() {
         if (closed) {
@@ -136,8 +248,13 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
 
     @Override
     public void setLocalNodeIdentifier(final NodeIdentifier nodeId) {
+        if (nodeId == null || nodeId.equals(this.nodeId)) {
+            return;
+        }
+
         this.nodeId = nodeId;
         nodeStatuses.computeIfAbsent(nodeId, id -> new NodeConnectionStatus(id, DisconnectionCode.NOT_YET_CONNECTED));
+        eventListeners.forEach(listener -> listener.onLocalNodeIdentifierSet(nodeId));
     }
 
     @Override
@@ -170,7 +287,7 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
         return localNodeId;
     }
 
-    private String getElectedActiveCoordinatorAddress() throws IOException {
+    private String getElectedActiveCoordinatorAddress() {
         return leaderElectionManager.getLeader(ClusterRoles.CLUSTER_COORDINATOR);
     }
 
@@ -185,11 +302,62 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
             final NodeConnectionStatus proposedStatus = entry.getValue();
 
             if (proposedStatus.getState() == NodeConnectionState.REMOVED) {
-                nodeStatuses.remove(nodeId);
+                removeNode(nodeId);
             } else {
-                nodeStatuses.put(nodeId, proposedStatus);
+                updateNodeStatus(nodeId, proposedStatus, false);
             }
         }
+
+        storeState();
+    }
+
+    private NodeConnectionStatus removeNode(final NodeIdentifier nodeId) {
+        final NodeConnectionStatus status = nodeStatuses.remove(nodeId);
+        nodeEvents.remove(nodeId);
+        if (status != null) {
+            onNodeRemoved(nodeId);
+        }
+
+        return status;
+    }
+
+    private boolean removeNodeConditionally(final NodeIdentifier nodeId, final NodeConnectionStatus expectedStatus) {
+        final boolean removed = nodeStatuses.remove(nodeId, expectedStatus);
+        if (removed) {
+            nodeEvents.remove(nodeId);
+            onNodeRemoved(nodeId);
+        }
+
+        return removed;
+    }
+
+    private NodeConnectionStatus updateNodeStatus(final NodeIdentifier nodeId, final NodeConnectionStatus updatedStatus) {
+        return updateNodeStatus(nodeId, updatedStatus, true);
+    }
+
+    private NodeConnectionStatus updateNodeStatus(final NodeIdentifier nodeId, final NodeConnectionStatus updatedStatus, final boolean storeState) {
+        final NodeConnectionStatus evictedStatus = nodeStatuses.put(nodeId, updatedStatus);
+        if (evictedStatus == null) {
+            onNodeAdded(nodeId, storeState);
+        }
+
+        return evictedStatus;
+    }
+
+    private boolean updateNodeStatusConditionally(final NodeIdentifier nodeId, final NodeConnectionStatus expectedStatus, final NodeConnectionStatus updatedStatus) {
+        final boolean updated;
+        if (expectedStatus == null) {
+            final NodeConnectionStatus existingValue = nodeStatuses.putIfAbsent(nodeId, updatedStatus);
+            updated = existingValue == null;
+
+            if (updated) {
+                onNodeAdded(nodeId, true);
+            }
+        } else {
+            updated = nodeStatuses.replace(nodeId, expectedStatus, updatedStatus);
+        }
+
+        return updated;
     }
 
     @Override
@@ -228,17 +396,21 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
 
         if (currentStatus == null) {
             if (newStatus.getState() == NodeConnectionState.REMOVED) {
-                return nodeStatuses.remove(nodeId, currentStatus);
+                return removeNodeConditionally(nodeId, currentStatus);
             } else {
-                final NodeConnectionStatus existingValue = nodeStatuses.putIfAbsent(nodeId, newStatus);
-                return existingValue == null;
+                return updateNodeStatusConditionally(nodeId, null, newStatus);
             }
         }
 
         if (newStatus.getState() == NodeConnectionState.REMOVED) {
-            return nodeStatuses.remove(nodeId, currentStatus);
+            if (removeNodeConditionally(nodeId, currentStatus)) {
+                storeState();
+                return true;
+            } else {
+                return false;
+            }
         } else {
-            return nodeStatuses.replace(nodeId, currentStatus, newStatus);
+            return updateNodeStatusConditionally(nodeId, currentStatus, newStatus);
         }
     }
 
@@ -348,9 +520,23 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
     @Override
     public void removeNode(final NodeIdentifier nodeId, final String userDn) {
         reportEvent(nodeId, Severity.INFO, "User " + userDn + " requested that node be removed from cluster");
-        nodeStatuses.remove(nodeId);
-        nodeEvents.remove(nodeId);
         notifyOthersOfNodeStatusChange(new NodeConnectionStatus(nodeId, NodeConnectionState.REMOVED));
+        removeNode(nodeId);
+
+        storeState();
+    }
+
+    private void onNodeRemoved(final NodeIdentifier nodeId) {
+        eventListeners.stream().forEach(listener -> listener.onNodeRemoved(nodeId));
+    }
+
+    private void onNodeAdded(final NodeIdentifier nodeId, final boolean storeState) {
+        if (storeState) {
+            storeState();
+        }
+
+
+        eventListeners.stream().forEach(listener -> listener.onNodeAdded(nodeId));
     }
 
     @Override
@@ -381,8 +567,18 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
     }
 
     @Override
-    public boolean isBlockedByFirewall(final String hostname) {
-        return firewall != null && !firewall.isPermissible(hostname);
+    public boolean isBlockedByFirewall(final Set<String> nodeIdentities) {
+        if (firewall == null) {
+            return false;
+        }
+
+        for (final String nodeId : nodeIdentities) {
+            if (firewall.isPermissible(nodeId)) {
+                return false;
+            }
+        }
+
+        return true;
     }
 
     @Override
@@ -455,28 +651,21 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
     }
 
     private NodeIdentifier getElectedActiveCoordinatorNode(final boolean warnOnError) {
-        final String electedNodeAddress;
+        String electedNodeAddress;
         try {
             electedNodeAddress = getElectedActiveCoordinatorAddress();
         } catch (final NoClusterCoordinatorException ncce) {
             logger.debug("There is currently no elected active Cluster Coordinator");
             return null;
-        } catch (final IOException ioe) {
-            if (warnOnError) {
-                logger.warn("Failed to determine which node is elected active Cluster Coordinator. There may be no coordinator currently: " + ioe);
-                if (logger.isDebugEnabled()) {
-                    logger.warn("", ioe);
-                }
-            }
-
-            return null;
         }
 
-        if (electedNodeAddress == null) {
+        if (electedNodeAddress == null || electedNodeAddress.trim().isEmpty()) {
             logger.debug("There is currently no elected active Cluster Coordinator");
             return null;
         }
 
+        electedNodeAddress = electedNodeAddress.trim();
+
         final int colonLoc = electedNodeAddress.indexOf(':');
         if (colonLoc < 1) {
             if (warnOnError) {
@@ -519,6 +708,7 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
 
                 final NodeConnectionStatus existingStatus = this.nodeStatuses.putIfAbsent(connectionStatus.getNodeIdentifier(), connectionStatus);
                 if (existingStatus == null) {
+                    onNodeAdded(connectionStatus.getNodeIdentifier(), true);
                     return connectionStatus.getNodeIdentifier();
                 } else {
                     return existingStatus.getNodeIdentifier();
@@ -594,7 +784,7 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
         // this method is called when something occurs that causes this node to change the status of the
         // node in question. We only use comparisons against the current value when we receive an update
         // about a node status from a different node, since those may be received out-of-order.
-        final NodeConnectionStatus currentStatus = nodeStatuses.put(nodeId, status);
+        final NodeConnectionStatus currentStatus = updateNodeStatus(nodeId, status);
         final NodeConnectionState currentState = currentStatus == null ? null : currentStatus.getState();
         logger.info("Status of {} changed from {} to {}", nodeId, currentStatus, status);
         logger.debug("State of cluster nodes is now {}", nodeStatuses);
@@ -741,10 +931,10 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
     }
 
     @Override
-    public ProtocolMessage handle(final ProtocolMessage protocolMessage) throws ProtocolException {
+    public ProtocolMessage handle(final ProtocolMessage protocolMessage, final Set<String> nodeIdentities) throws ProtocolException {
         switch (protocolMessage.getType()) {
             case CONNECTION_REQUEST:
-                return handleConnectionRequest((ConnectionRequestMessage) protocolMessage);
+                return handleConnectionRequest((ConnectionRequestMessage) protocolMessage, nodeIdentities);
             case NODE_STATUS_CHANGE:
                 handleNodeStatusChange((NodeStatusChangeMessage) protocolMessage);
                 return null;
@@ -790,9 +980,11 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
 
         // Either remove the value from the map or update the map depending on the connection state
         if (statusChangeMessage.getNodeConnectionStatus().getState() == NodeConnectionState.REMOVED) {
-            nodeStatuses.remove(nodeId, oldStatus);
+            if (removeNodeConditionally(nodeId, oldStatus)) {
+                storeState();
+            }
         } else {
-            nodeStatuses.put(nodeId, updatedStatus);
+            updateNodeStatus(nodeId, updatedStatus);
         }
 
         logger.info("Status of {} changed from {} to {}", statusChangeMessage.getNodeId(), oldStatus, updatedStatus);
@@ -838,6 +1030,7 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
             // there is no node with that ID
             resolvedNodeId = proposedIdentifier;
             logger.debug("No existing node with ID {}; resolved node ID is as-proposed", proposedIdentifier.getId());
+            onNodeAdded(resolvedNodeId, true);
         } else if (existingStatus.getNodeIdentifier().logicallyEquals(proposedIdentifier)) {
             // there is a node with that ID but it's the same node.
             resolvedNodeId = proposedIdentifier;
@@ -854,28 +1047,37 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
         return resolvedNodeId;
     }
 
-    private ConnectionResponseMessage handleConnectionRequest(final ConnectionRequestMessage requestMessage) {
+    private ConnectionResponseMessage handleConnectionRequest(final ConnectionRequestMessage requestMessage, final Set<String> nodeIdentities) {
         final NodeIdentifier proposedIdentifier = requestMessage.getConnectionRequest().getProposedNodeIdentifier();
-        final NodeIdentifier withRequestorDn = addRequestorDn(proposedIdentifier, requestMessage.getRequestorDN());
+        final NodeIdentifier withNodeIdentities = addNodeIdentities(proposedIdentifier, nodeIdentities);
         final DataFlow dataFlow = requestMessage.getConnectionRequest().getDataFlow();
-        final ConnectionRequest requestWithDn = new ConnectionRequest(withRequestorDn, dataFlow);
+        final ConnectionRequest requestWithNodeIdentities = new ConnectionRequest(withNodeIdentities, dataFlow);
 
         // Resolve Node identifier.
         final NodeIdentifier resolvedNodeId = resolveNodeId(proposedIdentifier);
 
+        if (isBlockedByFirewall(nodeIdentities)) {
+            // if the socket address is not listed in the firewall, then return a null response
+            logger.info("Firewall blocked connection request from node " + resolvedNodeId + " with Node Identities " + nodeIdentities);
+            final ConnectionResponse response = ConnectionResponse.createBlockedByFirewallResponse();
+            final ConnectionResponseMessage responseMessage = new ConnectionResponseMessage();
+            responseMessage.setConnectionResponse(response);
+            return responseMessage;
+        }
+
         if (requireElection) {
-            final DataFlow electedDataFlow = flowElection.castVote(dataFlow, withRequestorDn);
+            final DataFlow electedDataFlow = flowElection.castVote(dataFlow, withNodeIdentities);
             if (electedDataFlow == null) {
-                logger.info("Received Connection Request from {}; responding with Flow Election In Progress message", withRequestorDn);
+                logger.info("Received Connection Request from {}; responding with Flow Election In Progress message", withNodeIdentities);
                 return createFlowElectionInProgressResponse();
             } else {
-                logger.info("Received Connection Request from {}; responding with DataFlow that was elected", withRequestorDn);
-                return createConnectionResponse(requestWithDn, resolvedNodeId, electedDataFlow);
+                logger.info("Received Connection Request from {}; responding with DataFlow that was elected", withNodeIdentities);
+                return createConnectionResponse(requestWithNodeIdentities, resolvedNodeId, electedDataFlow);
             }
         }
 
-        logger.info("Received Connection Request from {}; responding with my DataFlow", withRequestorDn);
-        return createConnectionResponse(requestWithDn, resolvedNodeId);
+        logger.info("Received Connection Request from {}; responding with my DataFlow", withNodeIdentities);
+        return createConnectionResponse(requestWithNodeIdentities, resolvedNodeId);
     }
 
     private ConnectionResponseMessage createFlowElectionInProgressResponse() {
@@ -901,15 +1103,6 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
 
 
     private ConnectionResponseMessage createConnectionResponse(final ConnectionRequest request, final NodeIdentifier resolvedNodeIdentifier, final DataFlow clusterDataFlow) {
-        if (isBlockedByFirewall(resolvedNodeIdentifier.getSocketAddress())) {
-            // if the socket address is not listed in the firewall, then return a null response
-            logger.info("Firewall blocked connection request from node " + resolvedNodeIdentifier);
-            final ConnectionResponse response = ConnectionResponse.createBlockedByFirewallResponse();
-            final ConnectionResponseMessage responseMessage = new ConnectionResponseMessage();
-            responseMessage.setConnectionResponse(response);
-            return responseMessage;
-        }
-
         if (clusterDataFlow == null) {
             final ConnectionResponseMessage responseMessage = new ConnectionResponseMessage();
             responseMessage.setConnectionResponse(new ConnectionResponse(5, "The cluster dataflow is not yet available"));
@@ -936,11 +1129,12 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
     }
 
 
-    private NodeIdentifier addRequestorDn(final NodeIdentifier nodeId, final String dn) {
+    private NodeIdentifier addNodeIdentities(final NodeIdentifier nodeId, final Set<String> nodeIdentities) {
         return new NodeIdentifier(nodeId.getId(), nodeId.getApiAddress(), nodeId.getApiPort(),
                 nodeId.getSocketAddress(), nodeId.getSocketPort(),
+                nodeId.getLoadBalanceAddress(), nodeId.getLoadBalancePort(),
                 nodeId.getSiteToSiteAddress(), nodeId.getSiteToSitePort(),
-                nodeId.getSiteToSiteHttpApiPort(), nodeId.isSiteToSiteSecure(), dn);
+                nodeId.getSiteToSiteHttpApiPort(), nodeId.isSiteToSiteSecure(), nodeIdentities);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/state/NodeIdentifierDescriptor.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/state/NodeIdentifierDescriptor.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/state/NodeIdentifierDescriptor.java
new file mode 100644
index 0000000..eeabbe3
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/state/NodeIdentifierDescriptor.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.cluster.coordination.node.state;
+
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+public class NodeIdentifierDescriptor {
+    private String id;
+    private String apiAddress;
+    private int apiPort;
+    private String socketAddress;
+    private int socketPort;
+    private String loadBalanceAddress;
+    private int loadBalancePort;
+    private String siteToSiteAddress;
+    private Integer siteToSitePort;
+    private Integer siteToSiteHttpApiPort;
+    private Boolean siteToSiteSecure;
+    private Set<String> nodeIdentities;
+    private boolean localNodeIdentifier;
+
+    public String getId() {
+        return id;
+    }
+
+    public void setId(final String id) {
+        this.id = id;
+    }
+
+    public String getApiAddress() {
+        return apiAddress;
+    }
+
+    public void setApiAddress(final String apiAddress) {
+        this.apiAddress = apiAddress;
+    }
+
+    public int getApiPort() {
+        return apiPort;
+    }
+
+    public void setApiPort(final int apiPort) {
+        this.apiPort = apiPort;
+    }
+
+    public String getSocketAddress() {
+        return socketAddress;
+    }
+
+    public void setSocketAddress(final String socketAddress) {
+        this.socketAddress = socketAddress;
+    }
+
+    public int getSocketPort() {
+        return socketPort;
+    }
+
+    public void setSocketPort(final int socketPort) {
+        this.socketPort = socketPort;
+    }
+
+    public String getLoadBalanceAddress() {
+        return loadBalanceAddress;
+    }
+
+    public void setLoadBalanceAddress(final String loadBalanceAddress) {
+        this.loadBalanceAddress = loadBalanceAddress;
+    }
+
+    public int getLoadBalancePort() {
+        return loadBalancePort;
+    }
+
+    public void setLoadBalancePort(final int loadBalancePort) {
+        this.loadBalancePort = loadBalancePort;
+    }
+
+    public String getSiteToSiteAddress() {
+        return siteToSiteAddress;
+    }
+
+    public void setSiteToSiteAddress(final String siteToSiteAddress) {
+        this.siteToSiteAddress = siteToSiteAddress;
+    }
+
+    public Integer getSiteToSitePort() {
+        return siteToSitePort;
+    }
+
+    public void setSiteToSitePort(final Integer siteToSitePort) {
+        this.siteToSitePort = siteToSitePort;
+    }
+
+    public Integer getSiteToSiteHttpApiPort() {
+        return siteToSiteHttpApiPort;
+    }
+
+    public void setSiteToSiteHttpApiPort(final Integer siteToSiteHttpApiPort) {
+        this.siteToSiteHttpApiPort = siteToSiteHttpApiPort;
+    }
+
+    public Boolean getSiteToSiteSecure() {
+        return siteToSiteSecure;
+    }
+
+    public void setSiteToSiteSecure(final Boolean siteToSiteSecure) {
+        this.siteToSiteSecure = siteToSiteSecure;
+    }
+
+    public Set<String> getNodeIdentities() {
+        return nodeIdentities;
+    }
+
+    public void setNodeIdentities(final Set<String> nodeIdentities) {
+        this.nodeIdentities = Collections.unmodifiableSet(new HashSet<>(nodeIdentities));
+    }
+
+    public boolean isLocalNodeIdentifier() {
+        return localNodeIdentifier;
+    }
+
+    public void setLocalNodeIdentifier(final boolean localNodeIdentifier) {
+        this.localNodeIdentifier = localNodeIdentifier;
+    }
+
+    public static NodeIdentifierDescriptor fromNodeIdentifier(final NodeIdentifier nodeId, final boolean localNodeId) {
+        final NodeIdentifierDescriptor descriptor = new NodeIdentifierDescriptor();
+        descriptor.setId(nodeId.getId());
+        descriptor.setApiAddress(nodeId.getApiAddress());
+        descriptor.setApiPort(nodeId.getApiPort());
+        descriptor.setSocketAddress(nodeId.getSocketAddress());
+        descriptor.setSocketPort(nodeId.getSocketPort());
+        descriptor.setSiteToSiteAddress(nodeId.getSiteToSiteAddress());
+        descriptor.setSiteToSitePort(nodeId.getSiteToSitePort());
+        descriptor.setSiteToSiteHttpApiPort(nodeId.getSiteToSiteHttpApiPort());
+        descriptor.setSiteToSiteSecure(nodeId.isSiteToSiteSecure());
+        descriptor.setNodeIdentities(nodeId.getNodeIdentities());
+        descriptor.setLoadBalanceAddress(nodeId.getLoadBalanceAddress());
+        descriptor.setLoadBalancePort(nodeId.getLoadBalancePort());
+        descriptor.setLocalNodeIdentifier(localNodeId);
+        return descriptor;
+    }
+
+    public NodeIdentifier toNodeIdentifier() {
+        return new NodeIdentifier(getId(), getApiAddress(), getApiPort(), getSocketAddress(), getSocketPort(), getLoadBalanceAddress(), getLoadBalancePort(),
+            getSiteToSiteAddress(), getSiteToSitePort(), getSiteToSiteHttpApiPort(), getSiteToSiteSecure(), getNodeIdentities());
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ConnectionEntityMerger.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ConnectionEntityMerger.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ConnectionEntityMerger.java
index 7e3bc5d..a25fb4b 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ConnectionEntityMerger.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ConnectionEntityMerger.java
@@ -17,10 +17,12 @@
 package org.apache.nifi.cluster.manager;
 
 import org.apache.nifi.cluster.protocol.NodeIdentifier;
+import org.apache.nifi.web.api.dto.ConnectionDTO;
 import org.apache.nifi.web.api.dto.status.ConnectionStatusDTO;
 import org.apache.nifi.web.api.entity.ConnectionEntity;
 
 import java.util.Map;
+import java.util.Objects;
 
 public class ConnectionEntityMerger implements ComponentEntityMerger<ConnectionEntity>, ComponentEntityStatusMerger<ConnectionStatusDTO> {
 
@@ -33,6 +35,22 @@ public class ConnectionEntityMerger implements ComponentEntityMerger<ConnectionE
                 mergeStatus(clientEntity.getStatus(), clientEntity.getPermissions().getCanRead(), entry.getValue().getStatus(), entry.getValue().getPermissions().getCanRead(), entry.getKey());
             }
         }
+
+        // If Load Balancing is configured but client entity indicates that data is not being transferred, we need to check if any other
+        // node is actively transferring data. If Client Entity is transferring data, we already know the correct value for the Status,
+        // and if the Connection is not configured for Load Balancing, then we also know the correct value, so no need to look at all of
+        // the values of the other nodes.
+        if (clientEntity.getComponent() != null && ConnectionDTO.LOAD_BALANCE_INACTIVE.equals(clientEntity.getComponent().getLoadBalanceStatus())) {
+            final boolean anyActive = entityMap.values().stream()
+                .map(ConnectionEntity::getComponent)
+                .filter(Objects::nonNull)
+                .map(ConnectionDTO::getLoadBalanceStatus)
+                .anyMatch(status -> status.equals(ConnectionDTO.LOAD_BALANCE_ACTIVE));
+
+            if (anyActive) {
+                clientEntity.getComponent().setLoadBalanceStatus(ConnectionDTO.LOAD_BALANCE_ACTIVE);
+            }
+        }
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ProcessorDiagnosticsEntityMerger.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ProcessorDiagnosticsEntityMerger.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ProcessorDiagnosticsEntityMerger.java
index 1f163f4..826ecf7 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ProcessorDiagnosticsEntityMerger.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ProcessorDiagnosticsEntityMerger.java
@@ -17,22 +17,27 @@
 
 package org.apache.nifi.cluster.manager;
 
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.function.Function;
-import java.util.stream.Collectors;
-
 import org.apache.nifi.cluster.protocol.NodeIdentifier;
+import org.apache.nifi.web.api.dto.diagnostics.ConnectionDiagnosticsDTO;
+import org.apache.nifi.web.api.dto.diagnostics.ConnectionDiagnosticsSnapshotDTO;
 import org.apache.nifi.web.api.dto.diagnostics.ControllerServiceDiagnosticsDTO;
 import org.apache.nifi.web.api.dto.diagnostics.JVMDiagnosticsSnapshotDTO;
+import org.apache.nifi.web.api.dto.diagnostics.LocalQueuePartitionDTO;
 import org.apache.nifi.web.api.dto.diagnostics.NodeJVMDiagnosticsSnapshotDTO;
 import org.apache.nifi.web.api.dto.diagnostics.ProcessorDiagnosticsDTO;
+import org.apache.nifi.web.api.dto.diagnostics.RemoteQueuePartitionDTO;
 import org.apache.nifi.web.api.dto.diagnostics.ThreadDumpDTO;
 import org.apache.nifi.web.api.entity.ControllerServiceEntity;
 import org.apache.nifi.web.api.entity.ProcessorDiagnosticsEntity;
 
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
 public class ProcessorDiagnosticsEntityMerger implements ComponentEntityMerger<ProcessorDiagnosticsEntity> {
     private final long componentStatusSnapshotMillis;
 
@@ -46,6 +51,11 @@ public class ProcessorDiagnosticsEntityMerger implements ComponentEntityMerger<P
 
         final List<NodeJVMDiagnosticsSnapshotDTO> nodeJvmDiagnosticsSnapshots = new ArrayList<>(entityMap.size());
 
+        // Merge connection diagnostics
+        mergeConnectionDiagnostics(clientEntity, entityMap, entity -> entity.getComponent().getIncomingConnections());
+        mergeConnectionDiagnostics(clientEntity, entityMap, entity -> entity.getComponent().getOutgoingConnections());
+
+
         // Merge the Processor Statuses and create a separate NodeJVMDiagnosticsSnapshotDTO for each. We do both of these
         // together simply because we are already iterating over the entityMap and we have to create the Node-specific JVM diagnostics
         // before we start merging the values, in the second iteration over the map.
@@ -99,7 +109,7 @@ public class ProcessorDiagnosticsEntityMerger implements ComponentEntityMerger<P
 
         // Merge permissions on referenced controller services
         final Map<String, ControllerServiceEntity> serviceEntityById = clientDto.getReferencedControllerServices().stream()
-            .map(diagnosticsDto -> diagnosticsDto.getControllerService())
+            .map(ControllerServiceDiagnosticsDTO::getControllerService)
             .collect(Collectors.toMap(ControllerServiceEntity::getId, Function.identity()));
 
         for (final Map.Entry<NodeIdentifier, ProcessorDiagnosticsEntity> entry : entityMap.entrySet()) {
@@ -114,6 +124,129 @@ public class ProcessorDiagnosticsEntityMerger implements ComponentEntityMerger<P
                 }
             }
         }
+    }
+
+    private void mergeConnectionDiagnostics(final ProcessorDiagnosticsEntity clientEntity, final Map<NodeIdentifier, ProcessorDiagnosticsEntity> entityMap,
+                                                                     final Function<ProcessorDiagnosticsEntity, Set<ConnectionDiagnosticsDTO>> extractConnections) {
+
+        final Map<String, List<ConnectionDiagnosticsSnapshotDTO>> snapshotByConnectionId = new HashMap<>();
+        final Map<String, ConnectionDiagnosticsDTO> connectionById = new HashMap<>();
+
+        for (final Map.Entry<NodeIdentifier, ProcessorDiagnosticsEntity> entry : entityMap.entrySet()) {
+            final NodeIdentifier nodeId = entry.getKey();
+            final ProcessorDiagnosticsEntity entity = entry.getValue();
+
+            final Set<ConnectionDiagnosticsDTO> connections = extractConnections.apply(entity);
+            for (final ConnectionDiagnosticsDTO connectionDiagnostics : connections) {
+                final String connectionId = connectionDiagnostics.getConnection().getId();
+                final ConnectionDiagnosticsSnapshotDTO snapshot = connectionDiagnostics.getAggregateSnapshot();
+
+                snapshot.setNodeIdentifier(nodeId.getApiAddress() + ":" + nodeId.getApiPort());
+
+                final List<ConnectionDiagnosticsSnapshotDTO> snapshots = snapshotByConnectionId.computeIfAbsent(connectionId, id -> new ArrayList<>());
+                snapshots.add(snapshot);
+
+                if (entity == clientEntity){
+                    connectionById.put(connectionId, connectionDiagnostics);
+                }
+            }
+        }
+
+        for (final Map.Entry<String, List<ConnectionDiagnosticsSnapshotDTO>> entry : snapshotByConnectionId.entrySet()) {
+            final String connectionId = entry.getKey();
+            final List<ConnectionDiagnosticsSnapshotDTO> snapshots = entry.getValue();
+
+            final ConnectionDiagnosticsDTO dto = connectionById.get(connectionId);
+            dto.setNodeSnapshots(snapshots);
+
+            dto.setAggregateSnapshot(mergeConnectionSnapshots(snapshots));
+        }
+    }
+
+
+
+    private ConnectionDiagnosticsSnapshotDTO mergeConnectionSnapshots(final List<ConnectionDiagnosticsSnapshotDTO> snapshots) {
+        final ConnectionDiagnosticsSnapshotDTO aggregate = new ConnectionDiagnosticsSnapshotDTO();
+
+        final Map<String, List<RemoteQueuePartitionDTO>> remotePartitionsByNodeId = new HashMap<>();
+
+        final LocalQueuePartitionDTO localPartition = new LocalQueuePartitionDTO();
+        localPartition.setActiveQueueByteCount(0);
+        localPartition.setActiveQueueFlowFileCount(0);
+        localPartition.setAllActiveQueueFlowFilesPenalized(true); // set to true because we will update this value by AND'ing it with the snapshot value
+        localPartition.setAnyActiveQueueFlowFilesPenalized(false); // set to false because we will update this value by OR'ing it with the snapshot value
+        localPartition.setInFlightByteCount(0);
+        localPartition.setInFlightFlowFileCount(0);
+        localPartition.setSwapByteCount(0);
+        localPartition.setSwapFiles(0);
+        localPartition.setSwapFlowFileCount(0);
+        localPartition.setTotalByteCount(0);
+        localPartition.setTotalFlowFileCount(0);
+
+        aggregate.setTotalByteCount(0L);
+        aggregate.setTotalFlowFileCount(0);
+        aggregate.setLocalQueuePartition(localPartition);
+
+        for (final ConnectionDiagnosticsSnapshotDTO snapshot : snapshots) {
+            aggregate.setTotalByteCount(aggregate.getTotalByteCount() + snapshot.getTotalByteCount());
+            aggregate.setTotalFlowFileCount(aggregate.getTotalFlowFileCount() + snapshot.getTotalFlowFileCount());
+
+            final LocalQueuePartitionDTO snapshotLocalPartition = snapshot.getLocalQueuePartition();
+            localPartition.setActiveQueueByteCount(localPartition.getActiveQueueByteCount() + snapshotLocalPartition.getActiveQueueByteCount());
+            localPartition.setActiveQueueFlowFileCount(localPartition.getActiveQueueFlowFileCount() + snapshotLocalPartition.getActiveQueueFlowFileCount());
+            localPartition.setAllActiveQueueFlowFilesPenalized(localPartition.getAllActiveQueueFlowFilesPenalized() && snapshotLocalPartition.getAllActiveQueueFlowFilesPenalized());
+            localPartition.setAnyActiveQueueFlowFilesPenalized(localPartition.getAnyActiveQueueFlowFilesPenalized() || snapshotLocalPartition.getAnyActiveQueueFlowFilesPenalized());
+            localPartition.setInFlightByteCount(localPartition.getInFlightByteCount() + snapshotLocalPartition.getInFlightByteCount());
+            localPartition.setInFlightFlowFileCount(localPartition.getInFlightFlowFileCount() + snapshotLocalPartition.getInFlightFlowFileCount());
+            localPartition.setSwapByteCount(localPartition.getSwapByteCount() + snapshotLocalPartition.getSwapByteCount());
+            localPartition.setSwapFiles(localPartition.getSwapFiles() + snapshotLocalPartition.getSwapFiles());
+            localPartition.setSwapFlowFileCount(localPartition.getSwapFlowFileCount() + snapshotLocalPartition.getSwapFlowFileCount());
+            localPartition.setTotalByteCount(localPartition.getTotalByteCount() + snapshotLocalPartition.getTotalByteCount());
+            localPartition.setTotalFlowFileCount(localPartition.getTotalFlowFileCount() + snapshotLocalPartition.getTotalFlowFileCount());
+
+            for (final RemoteQueuePartitionDTO remoteQueuePartition : snapshot.getRemoteQueuePartitions()) {
+                final String nodeId = remoteQueuePartition.getNodeIdentifier();
+                final List<RemoteQueuePartitionDTO> partitionsForNodeId = remotePartitionsByNodeId.computeIfAbsent(nodeId, key -> new ArrayList<>());
+                partitionsForNodeId.add(remoteQueuePartition);
+            }
+        }
+
+        final List<RemoteQueuePartitionDTO> mergedRemoteQueuePartitions = new ArrayList<>();
+        for (final List<RemoteQueuePartitionDTO> partitions : remotePartitionsByNodeId.values()) {
+            final RemoteQueuePartitionDTO merged = mergeRemoteQueuePartitions(partitions);
+            mergedRemoteQueuePartitions.add(merged);
+        }
+
+        aggregate.setRemoteQueuePartitions(mergedRemoteQueuePartitions);
+
+        return aggregate;
+    }
+
+    private RemoteQueuePartitionDTO mergeRemoteQueuePartitions(final List<RemoteQueuePartitionDTO> partitions) {
+        final RemoteQueuePartitionDTO merged = new RemoteQueuePartitionDTO();
+        merged.setActiveQueueByteCount(0);
+        merged.setActiveQueueFlowFileCount(0);
+        merged.setInFlightByteCount(0);
+        merged.setInFlightFlowFileCount(0);
+        merged.setSwapByteCount(0);
+        merged.setSwapFiles(0);
+        merged.setSwapFlowFileCount(0);
+        merged.setTotalByteCount(0);
+        merged.setTotalFlowFileCount(0);
+
+        for (final RemoteQueuePartitionDTO partition : partitions) {
+            merged.setActiveQueueByteCount(merged.getActiveQueueByteCount() + partition.getActiveQueueByteCount());
+            merged.setActiveQueueFlowFileCount(merged.getActiveQueueFlowFileCount() + partition.getActiveQueueFlowFileCount());
+            merged.setInFlightByteCount(merged.getInFlightByteCount() + partition.getInFlightByteCount());
+            merged.setInFlightFlowFileCount(merged.getInFlightFlowFileCount() + partition.getInFlightFlowFileCount());
+            merged.setSwapByteCount(merged.getSwapByteCount() + partition.getSwapByteCount());
+            merged.setSwapFiles(merged.getSwapFiles() + partition.getSwapFiles());
+            merged.setSwapFlowFileCount(merged.getSwapFlowFileCount() + partition.getSwapFlowFileCount());
+            merged.setTotalByteCount(merged.getTotalByteCount() + partition.getTotalByteCount());
+            merged.setTotalFlowFileCount(merged.getTotalFlowFileCount() + partition.getTotalFlowFileCount());
+            merged.setNodeIdentifier(partition.getNodeIdentifier());
+        }
 
+        return merged;
     }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/flow/TestPopularVoteFlowElection.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/flow/TestPopularVoteFlowElection.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/flow/TestPopularVoteFlowElection.java
index 1915b9b..9c833b5 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/flow/TestPopularVoteFlowElection.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/flow/TestPopularVoteFlowElection.java
@@ -218,7 +218,7 @@ public class TestPopularVoteFlowElection {
     }
 
     private NodeIdentifier createNodeId(final int index) {
-        return new NodeIdentifier(UUID.randomUUID().toString(), "localhost", 9000 + index, "localhost", 9000 + index, "localhost", 9000 + index, 9000 + index, true);
+        return new NodeIdentifier(UUID.randomUUID().toString(), "localhost", 9000 + index, "localhost", 9000 + index, "localhost", 9000 + index, "localhost", 9000 + index, 9000 + index, true);
     }
 
     private DataFlow createDataFlow(final byte[] flow) {

http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/heartbeat/TestAbstractHeartbeatMonitor.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/heartbeat/TestAbstractHeartbeatMonitor.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/heartbeat/TestAbstractHeartbeatMonitor.java
index 50bdd0d..6ea019d 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/heartbeat/TestAbstractHeartbeatMonitor.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/heartbeat/TestAbstractHeartbeatMonitor.java
@@ -19,6 +19,7 @@ package org.apache.nifi.cluster.coordination.heartbeat;
 
 import org.apache.nifi.cluster.ReportedEvent;
 import org.apache.nifi.cluster.coordination.ClusterCoordinator;
+import org.apache.nifi.cluster.coordination.ClusterTopologyEventListener;
 import org.apache.nifi.cluster.coordination.node.DisconnectionCode;
 import org.apache.nifi.cluster.coordination.node.NodeConnectionState;
 import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus;
@@ -55,7 +56,7 @@ public class TestAbstractHeartbeatMonitor {
     @Before
     public void setup() throws Exception {
         System.setProperty(NiFiProperties.PROPERTIES_FILE_PATH, "src/test/resources/conf/nifi.properties");
-        nodeId = new NodeIdentifier(UUID.randomUUID().toString(), "localhost", 9999, "localhost", 8888, "localhost", null, null, false);
+        nodeId = new NodeIdentifier(UUID.randomUUID().toString(), "localhost", 9999, "localhost", 8888, "localhost", 777, "localhost", null, null, false);
     }
 
     @After
@@ -136,7 +137,7 @@ public class TestAbstractHeartbeatMonitor {
     @Test
     public void testDisconnectionOfTerminatedNodeDueToLackOfHeartbeat() throws Exception {
         final NodeIdentifier nodeId1 = nodeId;
-        final NodeIdentifier nodeId2 = new NodeIdentifier(UUID.randomUUID().toString(), "localhost", 7777, "localhost", 6666, "localhost", null, null, false);
+        final NodeIdentifier nodeId2 = new NodeIdentifier(UUID.randomUUID().toString(), "localhost", 7777, "localhost", 6666, "localhost", 5555, "localhost", null, null, false);
 
         final ClusterCoordinatorAdapter adapter = new ClusterCoordinatorAdapter();
         final TestFriendlyHeartbeatMonitor monitor = createMonitor(adapter);
@@ -272,7 +273,7 @@ public class TestAbstractHeartbeatMonitor {
         }
 
         @Override
-        public synchronized boolean isBlockedByFirewall(String hostname) {
+        public synchronized boolean isBlockedByFirewall(Set<String> nodeIds) {
             return false;
         }
 
@@ -369,6 +370,14 @@ public class TestAbstractHeartbeatMonitor {
         public Map<NodeIdentifier, NodeWorkload> getClusterWorkload() throws IOException {
             return null;
         }
+
+        @Override
+        public void registerEventListener(final ClusterTopologyEventListener eventListener) {
+        }
+
+        @Override
+        public void unregisterEventListener(final ClusterTopologyEventListener eventListener) {
+        }
     }
 
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/StandardHttpResponseMapperSpec.groovy
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/StandardHttpResponseMapperSpec.groovy b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/StandardHttpResponseMapperSpec.groovy
index f2d3a24..75b7ae3 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/StandardHttpResponseMapperSpec.groovy
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/StandardHttpResponseMapperSpec.groovy
@@ -64,7 +64,8 @@ class StandardHttpResponseMapperSpec extends Specification {
             int n = it.node
             def response = Mock(Response)
             mockToRequestEntity.put response, it
-            new NodeResponse(new NodeIdentifier("cluster-node-$n", 'addr', n, 'sktaddr', n * 10, 'stsaddr', n * 100, n * 1000, false, null), "get", requestUri, response, 500L, requestId)
+            new NodeResponse(new NodeIdentifier("cluster-node-$n", 'addr', n, 'sktaddr', n * 10, 'sktaddr', n * 10, 'stsaddr', n * 100, n * 1000, false, null), "get", requestUri, response, 500L,
+                    requestId)
         } as Set
 
         when:
@@ -102,7 +103,8 @@ class StandardHttpResponseMapperSpec extends Specification {
             ++n
             def response = Mock(Response)
             mockToRequestEntity.put response, it
-            new NodeResponse(new NodeIdentifier("cluster-node-$n", 'addr', n, 'sktaddr', n * 10, 'stsaddr', n * 100, n * 1000, false, null), "get", requestUri, response, 500L, requestId)
+            new NodeResponse(new NodeIdentifier("cluster-node-$n", 'addr', n, 'sktaddr', n * 10, 'sktaddr', n * 11, 'stsaddr', n * 100, n * 1000, false, null), "get", requestUri, response, 500L,
+                    requestId)
         } as Set
 
         when:

http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/endpoints/CurrentUserEndpointMergerTest.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/endpoints/CurrentUserEndpointMergerTest.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/endpoints/CurrentUserEndpointMergerTest.java
index c1cfdf8..a93cd68 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/endpoints/CurrentUserEndpointMergerTest.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/endpoints/CurrentUserEndpointMergerTest.java
@@ -39,7 +39,7 @@ public class CurrentUserEndpointMergerTest {
 
     @Test
     public void testMergeUserPermissions() {
-        final NodeIdentifier nodeId1 = new NodeIdentifier("1", "localhost", 9000, "localhost", 9001, "localhost", 9002, 9003, false);
+        final NodeIdentifier nodeId1 = new NodeIdentifier("1", "localhost", 9000, "localhost", 9001, "localhost", 9006, "localhost", 9002, 9003, false);
         final CurrentUserEntity userNode1 = new CurrentUserEntity();
         userNode1.setControllerPermissions(buildPermissions(true, false));
         userNode1.setCountersPermissions(buildPermissions(true, true));
@@ -55,7 +55,7 @@ public class CurrentUserEndpointMergerTest {
         componentRestrictionsNode1.add(buildComponentRestriction(RequiredPermission.READ_FILESYSTEM, true, true));
         userNode1.setComponentRestrictionPermissions(componentRestrictionsNode1);
 
-        final NodeIdentifier nodeId2 = new NodeIdentifier("2", "localhost", 8000, "localhost", 8001, "localhost", 8002, 8003, false);
+        final NodeIdentifier nodeId2 = new NodeIdentifier("2", "localhost", 8000, "localhost", 8001, "localhost", 9006,"localhost", 8002, 8003, false);
         final CurrentUserEntity userNode2 = new CurrentUserEntity();
         userNode2.setControllerPermissions(buildPermissions(false, true));
         userNode2.setCountersPermissions(buildPermissions(true, false));

http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/endpoints/StatusHistoryEndpointMergerSpec.groovy
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/endpoints/StatusHistoryEndpointMergerSpec.groovy b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/endpoints/StatusHistoryEndpointMergerSpec.groovy
index 232c562..104e69b 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/endpoints/StatusHistoryEndpointMergerSpec.groovy
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/endpoints/StatusHistoryEndpointMergerSpec.groovy
@@ -57,7 +57,8 @@ class StatusHistoryEndpointMergerSpec extends Specification {
             ++n
             def response = Mock(Response)
             mockToRequestEntity.put response, it
-            new NodeResponse(new NodeIdentifier("cluster-node-$n", 'addr', n, 'sktaddr', n * 10, 'stsaddr', n * 100, n * 1000, false, null), "get", requestUri, response, 500L, requestId)
+            new NodeResponse(new NodeIdentifier("cluster-node-$n", 'addr', n, 'sktaddr', n * 10, null, n * 10, 'stsaddr', n * 100, n * 1000, false, null),
+                    "GET", requestUri, response, 500L, requestId)
         } as Set
 
         when: