You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by bb...@apache.org on 2016/10/03 17:58:28 UTC

nifi git commit: NIFI-2825: Fix S2S getPeers flow file count

Repository: nifi
Updated Branches:
  refs/heads/master 34e5a5321 -> 17a36c6fd


NIFI-2825: Fix S2S getPeers flow file count

- Added ClusterWorkload message to retrieve workload information from a
  cluster coordinator
- Use cluster workload to return queued flow file count to site-to-site
  client so that it can calculate distribution of data transfer

This closes #1084.

Signed-off-by: Bryan Bende <bb...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/17a36c6f
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/17a36c6f
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/17a36c6f

Branch: refs/heads/master
Commit: 17a36c6fd525dba837e13a71a52ddfd224782fae
Parents: 34e5a53
Author: Koji Kawamura <ij...@apache.org>
Authored: Wed Sep 28 11:07:22 2016 +0900
Committer: Bryan Bende <bb...@apache.org>
Committed: Mon Oct 3 13:28:01 2016 -0400

----------------------------------------------------------------------
 .../coordination/ClusterCoordinator.java        |  8 +++
 .../cluster/coordination/node/NodeWorkload.java | 71 ++++++++++++++++++++
 .../protocol/AbstractNodeProtocolSender.java    | 17 +++++
 .../cluster/protocol/NodeProtocolSender.java    | 10 +++
 .../impl/NodeProtocolSenderListener.java        |  7 ++
 .../protocol/jaxb/message/ObjectFactory.java    | 11 +++
 .../message/ClusterWorkloadRequestMessage.java  | 30 +++++++++
 .../message/ClusterWorkloadResponseMessage.java | 43 ++++++++++++
 .../protocol/message/ProtocolMessage.java       |  4 +-
 .../jaxb/message/TestJaxbProtocolUtils.java     | 56 +++++++++++++++
 .../ClusterProtocolHeartbeatMonitor.java        | 39 +++++++++--
 .../node/NodeClusterCoordinator.java            | 15 ++++-
 .../NodeClusterCoordinatorFactoryBean.java      |  5 +-
 .../heartbeat/TestAbstractHeartbeatMonitor.java |  6 ++
 .../node/TestNodeClusterCoordinator.java        |  8 ++-
 .../apache/nifi/cluster/integration/Node.java   |  3 +-
 .../ClusterCoordinatorNodeInformant.java        | 23 ++++---
 .../apache/nifi/web/api/SiteToSiteResource.java | 31 +++++----
 .../nifi/web/api/TestSiteToSiteResource.java    | 63 +++++++++++++++++
 19 files changed, 414 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/17a36c6f/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/ClusterCoordinator.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/ClusterCoordinator.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/ClusterCoordinator.java
index 723a374..1083fe6 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/ClusterCoordinator.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/ClusterCoordinator.java
@@ -17,6 +17,7 @@
 
 package org.apache.nifi.cluster.coordination;
 
+import java.io.IOException;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -24,6 +25,7 @@ import java.util.Set;
 import org.apache.nifi.cluster.coordination.node.DisconnectionCode;
 import org.apache.nifi.cluster.coordination.node.NodeConnectionState;
 import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus;
+import org.apache.nifi.cluster.coordination.node.NodeWorkload;
 import org.apache.nifi.cluster.event.NodeEvent;
 import org.apache.nifi.cluster.protocol.NodeIdentifier;
 import org.apache.nifi.reporting.Severity;
@@ -236,4 +238,10 @@ public interface ClusterCoordinator {
      * @return the current status of Flow Election.
      */
     String getFlowElectionStatus();
+
+    /**
+     * @return the current cluster workload retrieved from the cluster coordinator.
+     * @throws IOException thrown when it failed to communicate with the cluster coordinator.
+     */
+    Map<NodeIdentifier, NodeWorkload> getClusterWorkload() throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/17a36c6f/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/node/NodeWorkload.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/node/NodeWorkload.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/node/NodeWorkload.java
new file mode 100644
index 0000000..be5653e
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/node/NodeWorkload.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.cluster.coordination.node;
+
+import javax.xml.bind.annotation.XmlRootElement;
+
+@XmlRootElement(name = "nodeWorkload")
+public class NodeWorkload {
+
+    private long reportedTimestamp;
+    private int flowFileCount;
+    private long flowFileBytes;
+    private int activeThreadCount;
+    private long systemStartTime;
+
+    public long getReportedTimestamp() {
+        return reportedTimestamp;
+    }
+
+    public void setReportedTimestamp(long reportedTimestamp) {
+        this.reportedTimestamp = reportedTimestamp;
+    }
+
+    public int getFlowFileCount() {
+        return flowFileCount;
+    }
+
+    public void setFlowFileCount(int flowFileCount) {
+        this.flowFileCount = flowFileCount;
+    }
+
+    public long getFlowFileBytes() {
+        return flowFileBytes;
+    }
+
+    public void setFlowFileBytes(long flowFileBytes) {
+        this.flowFileBytes = flowFileBytes;
+    }
+
+    public int getActiveThreadCount() {
+        return activeThreadCount;
+    }
+
+    public void setActiveThreadCount(int activeThreadCount) {
+        this.activeThreadCount = activeThreadCount;
+    }
+
+    public long getSystemStartTime() {
+        return systemStartTime;
+    }
+
+    public void setSystemStartTime(long systemStartTime) {
+        this.systemStartTime = systemStartTime;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/17a36c6f/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/AbstractNodeProtocolSender.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/AbstractNodeProtocolSender.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/AbstractNodeProtocolSender.java
index 22d6ebc..db3fc1d 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/AbstractNodeProtocolSender.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/AbstractNodeProtocolSender.java
@@ -21,6 +21,8 @@ import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.Socket;
 
+import org.apache.nifi.cluster.protocol.message.ClusterWorkloadRequestMessage;
+import org.apache.nifi.cluster.protocol.message.ClusterWorkloadResponseMessage;
 import org.apache.nifi.cluster.protocol.message.ConnectionRequestMessage;
 import org.apache.nifi.cluster.protocol.message.ConnectionResponseMessage;
 import org.apache.nifi.cluster.protocol.message.HeartbeatMessage;
@@ -94,6 +96,21 @@ public abstract class AbstractNodeProtocolSender implements NodeProtocolSender {
         throw new ProtocolException("Expected message type '" + MessageType.HEARTBEAT_RESPONSE + "' but found '" + responseMessage.getType() + "'");
     }
 
+    @Override
+    public ClusterWorkloadResponseMessage clusterWorkload(final ClusterWorkloadRequestMessage msg) throws ProtocolException {
+        final InetSocketAddress serviceAddress;
+        try {
+            serviceAddress = getServiceAddress();
+        } catch (IOException e) {
+            throw new ProtocolException("Failed to getServiceAddress due to " + e, e);
+        }
+        final ProtocolMessage responseMessage = sendProtocolMessage(msg, serviceAddress.getHostName(), serviceAddress.getPort());
+        if (MessageType.CLUSTER_WORKLOAD_RESPONSE == responseMessage.getType()) {
+            return (ClusterWorkloadResponseMessage) responseMessage;
+        }
+
+        throw new ProtocolException("Expected message type '" + MessageType.CLUSTER_WORKLOAD_RESPONSE + "' but found '" + responseMessage.getType() + "'");
+    }
 
     private Socket createSocket() {
         InetSocketAddress socketAddress = null;

http://git-wip-us.apache.org/repos/asf/nifi/blob/17a36c6f/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/NodeProtocolSender.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/NodeProtocolSender.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/NodeProtocolSender.java
index fcf5195..bfcc62c 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/NodeProtocolSender.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/NodeProtocolSender.java
@@ -16,6 +16,8 @@
  */
 package org.apache.nifi.cluster.protocol;
 
+import org.apache.nifi.cluster.protocol.message.ClusterWorkloadRequestMessage;
+import org.apache.nifi.cluster.protocol.message.ClusterWorkloadResponseMessage;
 import org.apache.nifi.cluster.protocol.message.ConnectionRequestMessage;
 import org.apache.nifi.cluster.protocol.message.ConnectionResponseMessage;
 import org.apache.nifi.cluster.protocol.message.HeartbeatMessage;
@@ -49,4 +51,12 @@ public interface NodeProtocolSender {
      * @return the response from the Cluster Coordinator
      */
     HeartbeatResponseMessage heartbeat(HeartbeatMessage msg, String address) throws ProtocolException;
+
+    /**
+     * Sends a "cluster workflow request" message to the Cluster Coordinator.
+     * @param msg a request message
+     * @return the response from the Cluster Coordinator
+     * @throws ProtocolException if communication failed
+     */
+    ClusterWorkloadResponseMessage clusterWorkload(ClusterWorkloadRequestMessage msg) throws ProtocolException;
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/17a36c6f/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/NodeProtocolSenderListener.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/NodeProtocolSenderListener.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/NodeProtocolSenderListener.java
index 1b0aeea..d13b3d3 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/NodeProtocolSenderListener.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/NodeProtocolSenderListener.java
@@ -24,6 +24,8 @@ import org.apache.nifi.cluster.protocol.ProtocolException;
 import org.apache.nifi.cluster.protocol.ProtocolHandler;
 import org.apache.nifi.cluster.protocol.ProtocolListener;
 import org.apache.nifi.cluster.protocol.UnknownServiceAddressException;
+import org.apache.nifi.cluster.protocol.message.ClusterWorkloadRequestMessage;
+import org.apache.nifi.cluster.protocol.message.ClusterWorkloadResponseMessage;
 import org.apache.nifi.cluster.protocol.message.ConnectionRequestMessage;
 import org.apache.nifi.cluster.protocol.message.ConnectionResponseMessage;
 import org.apache.nifi.cluster.protocol.message.HeartbeatMessage;
@@ -96,4 +98,9 @@ public class NodeProtocolSenderListener implements NodeProtocolSender, ProtocolL
     public HeartbeatResponseMessage heartbeat(final HeartbeatMessage msg, final String address) throws ProtocolException {
         return sender.heartbeat(msg, address);
     }
+
+    @Override
+    public ClusterWorkloadResponseMessage clusterWorkload(ClusterWorkloadRequestMessage msg) throws ProtocolException {
+        return sender.clusterWorkload(msg);
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/17a36c6f/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/ObjectFactory.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/ObjectFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/ObjectFactory.java
index afa87b9..9a594a4 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/ObjectFactory.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/ObjectFactory.java
@@ -25,6 +25,8 @@ import org.apache.nifi.cluster.protocol.message.FlowRequestMessage;
 import org.apache.nifi.cluster.protocol.message.FlowResponseMessage;
 import org.apache.nifi.cluster.protocol.message.HeartbeatMessage;
 import org.apache.nifi.cluster.protocol.message.HeartbeatResponseMessage;
+import org.apache.nifi.cluster.protocol.message.ClusterWorkloadRequestMessage;
+import org.apache.nifi.cluster.protocol.message.ClusterWorkloadResponseMessage;
 import org.apache.nifi.cluster.protocol.message.MulticastProtocolMessage;
 import org.apache.nifi.cluster.protocol.message.NodeConnectionStatusRequestMessage;
 import org.apache.nifi.cluster.protocol.message.NodeConnectionStatusResponseMessage;
@@ -101,4 +103,13 @@ public class ObjectFactory {
     public HeartbeatResponseMessage createHeartbeatResponse() {
         return new HeartbeatResponseMessage();
     }
+
+    public ClusterWorkloadRequestMessage createClusterWorkloadRequest() {
+        return new ClusterWorkloadRequestMessage();
+    }
+
+    public ClusterWorkloadResponseMessage createClusterWorkloadResponse() {
+        return new ClusterWorkloadResponseMessage();
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/17a36c6f/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ClusterWorkloadRequestMessage.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ClusterWorkloadRequestMessage.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ClusterWorkloadRequestMessage.java
new file mode 100644
index 0000000..d8f87a4
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ClusterWorkloadRequestMessage.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.cluster.protocol.message;
+
+import javax.xml.bind.annotation.XmlRootElement;
+
+@XmlRootElement(name = "clusterWorkloadRequest")
+public class ClusterWorkloadRequestMessage extends ProtocolMessage {
+
+    @Override
+    public MessageType getType() {
+        return MessageType.CLUSTER_WORKLOAD_REQUEST;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/17a36c6f/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ClusterWorkloadResponseMessage.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ClusterWorkloadResponseMessage.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ClusterWorkloadResponseMessage.java
new file mode 100644
index 0000000..2852c48
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ClusterWorkloadResponseMessage.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.cluster.protocol.message;
+
+import org.apache.nifi.cluster.coordination.node.NodeWorkload;
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
+
+import javax.xml.bind.annotation.XmlRootElement;
+import java.util.Map;
+
+@XmlRootElement(name = "clusterWorkloadResponse")
+public class ClusterWorkloadResponseMessage extends ProtocolMessage {
+
+    private Map<NodeIdentifier, NodeWorkload> nodeWorkloads;
+
+    @Override
+    public MessageType getType() {
+        return MessageType.CLUSTER_WORKLOAD_RESPONSE;
+    }
+
+    public Map<NodeIdentifier, NodeWorkload> getNodeWorkloads() {
+        return nodeWorkloads;
+    }
+
+    public void setNodeWorkloads(Map<NodeIdentifier, NodeWorkload> nodeWorkloads) {
+        this.nodeWorkloads = nodeWorkloads;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/17a36c6f/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ProtocolMessage.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ProtocolMessage.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ProtocolMessage.java
index 1d0d115..1cab62f 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ProtocolMessage.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ProtocolMessage.java
@@ -35,7 +35,9 @@ public abstract class ProtocolMessage {
         HEARTBEAT_RESPONSE,
         NODE_CONNECTION_STATUS_REQUEST,
         NODE_CONNECTION_STATUS_RESPONSE,
-        NODE_STATUS_CHANGE;
+        NODE_STATUS_CHANGE,
+        CLUSTER_WORKLOAD_REQUEST,
+        CLUSTER_WORKLOAD_RESPONSE
     }
 
     public abstract MessageType getType();

http://git-wip-us.apache.org/repos/asf/nifi/blob/17a36c6f/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/jaxb/message/TestJaxbProtocolUtils.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/jaxb/message/TestJaxbProtocolUtils.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/jaxb/message/TestJaxbProtocolUtils.java
index 4fa53e8..8c2cca6 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/jaxb/message/TestJaxbProtocolUtils.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/jaxb/message/TestJaxbProtocolUtils.java
@@ -23,12 +23,16 @@ import static org.junit.Assert.assertTrue;
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
+import java.util.stream.IntStream;
 
 import javax.xml.bind.JAXBException;
 
 import org.apache.nifi.cluster.coordination.node.DisconnectionCode;
 import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus;
+import org.apache.nifi.cluster.coordination.node.NodeWorkload;
 import org.apache.nifi.cluster.protocol.ComponentRevision;
 import org.apache.nifi.cluster.protocol.ConnectionResponse;
 import org.apache.nifi.cluster.protocol.DataFlow;
@@ -38,6 +42,8 @@ import org.apache.nifi.cluster.protocol.NodeIdentifier;
 import org.apache.nifi.cluster.protocol.StandardDataFlow;
 import org.apache.nifi.cluster.protocol.message.ConnectionResponseMessage;
 import org.apache.nifi.cluster.protocol.message.HeartbeatMessage;
+import org.apache.nifi.cluster.protocol.message.ClusterWorkloadRequestMessage;
+import org.apache.nifi.cluster.protocol.message.ClusterWorkloadResponseMessage;
 import org.apache.nifi.cluster.protocol.message.NodeConnectionStatusRequestMessage;
 import org.apache.nifi.cluster.protocol.message.NodeConnectionStatusResponseMessage;
 import org.apache.nifi.web.Revision;
@@ -124,4 +130,54 @@ public class TestJaxbProtocolUtils {
         final Object unmarshalled = JaxbProtocolUtils.JAXB_CONTEXT.createUnmarshaller().unmarshal(new ByteArrayInputStream(baos.toByteArray()));
         assertTrue(unmarshalled instanceof HeartbeatMessage);
     }
+
+    @Test
+    public void testRoundTripClusterWorkloadRequest() throws JAXBException {
+        final ClusterWorkloadRequestMessage msg = new ClusterWorkloadRequestMessage();
+        final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        JaxbProtocolUtils.JAXB_CONTEXT.createMarshaller().marshal(msg, baos);
+        final Object unmarshalled = JaxbProtocolUtils.JAXB_CONTEXT.createUnmarshaller().unmarshal(new ByteArrayInputStream(baos.toByteArray()));
+        assertTrue(unmarshalled instanceof ClusterWorkloadRequestMessage);
+    }
+
+    @Test
+    public void testRoundTripClusterWorkloadResponse() throws JAXBException {
+        final ClusterWorkloadResponseMessage msg = new ClusterWorkloadResponseMessage();
+        final Map<NodeIdentifier, NodeWorkload> expectedNodeWorkloads = new HashMap<>();
+
+        IntStream.range(1, 4).forEach(i -> {
+            final String hostname = "node" + i;
+            final NodeIdentifier nodeId = new NodeIdentifier(hostname, hostname, 8080, hostname, 8081, hostname, 8082, 8083, false);
+            final NodeWorkload workload = new NodeWorkload();
+            workload.setReportedTimestamp(System.currentTimeMillis() - 1000);
+            workload.setSystemStartTime(System.currentTimeMillis());
+            workload.setActiveThreadCount(i);
+            workload.setFlowFileCount(i * 10);
+            workload.setFlowFileBytes(i * 10 * 1024);
+            expectedNodeWorkloads.put(nodeId, workload);
+        });
+        msg.setNodeWorkloads(expectedNodeWorkloads);
+
+        // Marshall.
+        final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        JaxbProtocolUtils.JAXB_CONTEXT.createMarshaller().marshal(msg, baos);
+
+        // Un-marshall.
+        final Object unmarshalled = JaxbProtocolUtils.JAXB_CONTEXT.createUnmarshaller().unmarshal(new ByteArrayInputStream(baos.toByteArray()));
+        assertTrue(unmarshalled instanceof ClusterWorkloadResponseMessage);
+
+        // Assert result.
+        final ClusterWorkloadResponseMessage response = (ClusterWorkloadResponseMessage) unmarshalled;
+        assertEquals(expectedNodeWorkloads.size(), response.getNodeWorkloads().size());
+        response.getNodeWorkloads().entrySet().stream().forEach(entry -> {
+            assertTrue(expectedNodeWorkloads.containsKey(entry.getKey()));
+            final NodeWorkload w = entry.getValue();
+            NodeWorkload expectedW = expectedNodeWorkloads.get(entry.getKey());
+            assertEquals(expectedW.getActiveThreadCount(), w.getActiveThreadCount());
+            assertEquals(expectedW.getReportedTimestamp(), w.getReportedTimestamp());
+            assertEquals(expectedW.getSystemStartTime(), w.getSystemStartTime());
+            assertEquals(expectedW.getFlowFileBytes(), w.getFlowFileBytes());
+            assertEquals(expectedW.getFlowFileCount(), w.getFlowFileCount());
+        });
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/17a36c6f/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/ClusterProtocolHeartbeatMonitor.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/ClusterProtocolHeartbeatMonitor.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/ClusterProtocolHeartbeatMonitor.java
index 9f620d9..6a8e575 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/ClusterProtocolHeartbeatMonitor.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/ClusterProtocolHeartbeatMonitor.java
@@ -18,6 +18,7 @@ package org.apache.nifi.cluster.coordination.heartbeat;
 
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -30,6 +31,7 @@ import javax.xml.bind.Unmarshaller;
 import org.apache.nifi.cluster.coordination.ClusterCoordinator;
 import org.apache.nifi.cluster.coordination.node.NodeConnectionState;
 import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus;
+import org.apache.nifi.cluster.coordination.node.NodeWorkload;
 import org.apache.nifi.cluster.protocol.Heartbeat;
 import org.apache.nifi.cluster.protocol.HeartbeatPayload;
 import org.apache.nifi.cluster.protocol.NodeIdentifier;
@@ -38,6 +40,8 @@ import org.apache.nifi.cluster.protocol.ProtocolHandler;
 import org.apache.nifi.cluster.protocol.ProtocolListener;
 import org.apache.nifi.cluster.protocol.message.HeartbeatMessage;
 import org.apache.nifi.cluster.protocol.message.HeartbeatResponseMessage;
+import org.apache.nifi.cluster.protocol.message.ClusterWorkloadRequestMessage;
+import org.apache.nifi.cluster.protocol.message.ClusterWorkloadResponseMessage;
 import org.apache.nifi.cluster.protocol.message.ProtocolMessage;
 import org.apache.nifi.cluster.protocol.message.ProtocolMessage.MessageType;
 import org.apache.nifi.util.NiFiProperties;
@@ -130,11 +134,18 @@ public class ClusterProtocolHeartbeatMonitor extends AbstractHeartbeatMonitor im
 
     @Override
     public ProtocolMessage handle(final ProtocolMessage msg) throws ProtocolException {
-        if (msg.getType() != MessageType.HEARTBEAT) {
-            throw new ProtocolException("Cannot handle message of type " + msg.getType());
+        switch (msg.getType()) {
+            case HEARTBEAT:
+                return handleHeartbeat((HeartbeatMessage) msg);
+            case CLUSTER_WORKLOAD_REQUEST:
+                return handleClusterWorkload((ClusterWorkloadRequestMessage) msg);
+            default:
+                throw new ProtocolException("Cannot handle message of type " + msg.getType());
         }
+    }
 
-        final HeartbeatMessage heartbeatMsg = (HeartbeatMessage) msg;
+    private ProtocolMessage handleHeartbeat(final HeartbeatMessage msg) {
+        final HeartbeatMessage heartbeatMsg = msg;
         final Heartbeat heartbeat = heartbeatMsg.getHeartbeat();
 
         final NodeIdentifier nodeId = heartbeat.getNodeIdentifier();
@@ -169,6 +180,26 @@ public class ClusterProtocolHeartbeatMonitor extends AbstractHeartbeatMonitor im
         return responseMessage;
     }
 
+    private ProtocolMessage handleClusterWorkload(final ClusterWorkloadRequestMessage msg) {
+
+        final ClusterWorkloadResponseMessage response = new ClusterWorkloadResponseMessage();
+        final Map<NodeIdentifier, NodeWorkload> workloads = new HashMap<>();
+        getLatestHeartbeats().values().stream()
+            .filter(hb -> NodeConnectionState.CONNECTED.equals(hb.getConnectionStatus().getState()))
+            .forEach(hb -> {
+                NodeWorkload wl = new NodeWorkload();
+                wl.setReportedTimestamp(hb.getTimestamp());
+                wl.setSystemStartTime(hb.getSystemStartTime());
+                wl.setActiveThreadCount(hb.getActiveThreadCount());
+                wl.setFlowFileCount(hb.getFlowFileCount());
+                wl.setFlowFileBytes(hb.getFlowFileBytes());
+                workloads.put(hb.getNodeIdentifier(), wl);
+            });
+        response.setNodeWorkloads(workloads);
+
+        return response;
+    }
+
     private List<NodeConnectionStatus> getUpdatedStatuses(final List<NodeConnectionStatus> nodeStatusList) {
         // Map node's statuses by NodeIdentifier for quick & easy lookup
         final Map<NodeIdentifier, NodeConnectionStatus> nodeStatusMap = nodeStatusList.stream()
@@ -201,6 +232,6 @@ public class ClusterProtocolHeartbeatMonitor extends AbstractHeartbeatMonitor im
 
     @Override
     public boolean canHandle(ProtocolMessage msg) {
-        return msg.getType() == MessageType.HEARTBEAT;
+        return msg.getType() == MessageType.HEARTBEAT || msg.getType() == MessageType.CLUSTER_WORKLOAD_REQUEST;
     }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/17a36c6f/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java
index a6a6009..4b74e1b 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java
@@ -49,10 +49,13 @@ import org.apache.nifi.cluster.protocol.ConnectionRequest;
 import org.apache.nifi.cluster.protocol.ConnectionResponse;
 import org.apache.nifi.cluster.protocol.DataFlow;
 import org.apache.nifi.cluster.protocol.NodeIdentifier;
+import org.apache.nifi.cluster.protocol.NodeProtocolSender;
 import org.apache.nifi.cluster.protocol.ProtocolException;
 import org.apache.nifi.cluster.protocol.ProtocolHandler;
 import org.apache.nifi.cluster.protocol.StandardDataFlow;
 import org.apache.nifi.cluster.protocol.impl.ClusterCoordinationProtocolSenderListener;
+import org.apache.nifi.cluster.protocol.message.ClusterWorkloadRequestMessage;
+import org.apache.nifi.cluster.protocol.message.ClusterWorkloadResponseMessage;
 import org.apache.nifi.cluster.protocol.message.ConnectionRequestMessage;
 import org.apache.nifi.cluster.protocol.message.ConnectionResponseMessage;
 import org.apache.nifi.cluster.protocol.message.DisconnectMessage;
@@ -88,6 +91,7 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
     private final LeaderElectionManager leaderElectionManager;
     private final AtomicLong latestUpdateId = new AtomicLong(-1);
     private final FlowElection flowElection;
+    private final NodeProtocolSender nodeProtocolSender;
 
     private volatile FlowService flowService;
     private volatile boolean connected;
@@ -98,7 +102,8 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
     private final ConcurrentMap<NodeIdentifier, CircularFifoQueue<NodeEvent>> nodeEvents = new ConcurrentHashMap<>();
 
     public NodeClusterCoordinator(final ClusterCoordinationProtocolSenderListener senderListener, final EventReporter eventReporter, final LeaderElectionManager leaderElectionManager,
-            final FlowElection flowElection, final ClusterNodeFirewall firewall, final RevisionManager revisionManager, final NiFiProperties nifiProperties) {
+            final FlowElection flowElection, final ClusterNodeFirewall firewall, final RevisionManager revisionManager, final NiFiProperties nifiProperties,
+            final NodeProtocolSender nodeProtocolSender) {
         this.senderListener = senderListener;
         this.flowService = null;
         this.eventReporter = eventReporter;
@@ -107,6 +112,7 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
         this.nifiProperties = nifiProperties;
         this.leaderElectionManager = leaderElectionManager;
         this.flowElection = flowElection;
+        this.nodeProtocolSender = nodeProtocolSender;
 
         senderListener.addHandler(this);
     }
@@ -1043,4 +1049,11 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
     public boolean isConnected() {
         return connected;
     }
+
+    @Override
+    public Map<NodeIdentifier, NodeWorkload> getClusterWorkload() throws IOException {
+        final ClusterWorkloadRequestMessage request = new ClusterWorkloadRequestMessage();
+        final ClusterWorkloadResponseMessage response = nodeProtocolSender.clusterWorkload(request);
+        return response.getNodeWorkloads();
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/17a36c6f/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/spring/NodeClusterCoordinatorFactoryBean.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/spring/NodeClusterCoordinatorFactoryBean.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/spring/NodeClusterCoordinatorFactoryBean.java
index 2845a01..ac79a42 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/spring/NodeClusterCoordinatorFactoryBean.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/spring/NodeClusterCoordinatorFactoryBean.java
@@ -20,6 +20,7 @@ package org.apache.nifi.cluster.spring;
 import org.apache.nifi.cluster.coordination.flow.FlowElection;
 import org.apache.nifi.cluster.coordination.node.NodeClusterCoordinator;
 import org.apache.nifi.cluster.firewall.ClusterNodeFirewall;
+import org.apache.nifi.cluster.protocol.NodeProtocolSender;
 import org.apache.nifi.cluster.protocol.impl.ClusterCoordinationProtocolSenderListener;
 import org.apache.nifi.controller.leader.election.LeaderElectionManager;
 import org.apache.nifi.events.EventReporter;
@@ -46,8 +47,8 @@ public class NodeClusterCoordinatorFactoryBean implements FactoryBean<NodeCluste
             final RevisionManager revisionManager = applicationContext.getBean("revisionManager", RevisionManager.class);
             final LeaderElectionManager electionManager = applicationContext.getBean("leaderElectionManager", LeaderElectionManager.class);
             final FlowElection flowElection = applicationContext.getBean("flowElection", FlowElection.class);
-
-            nodeClusterCoordinator = new NodeClusterCoordinator(protocolSenderListener, eventReporter, electionManager, flowElection, clusterFirewall, revisionManager, properties);
+            final NodeProtocolSender nodeProtocolSender = applicationContext.getBean("nodeProtocolSender", NodeProtocolSender.class);
+            nodeClusterCoordinator = new NodeClusterCoordinator(protocolSenderListener, eventReporter, electionManager, flowElection, clusterFirewall, revisionManager, properties, nodeProtocolSender);
         }
 
         return nodeClusterCoordinator;

http://git-wip-us.apache.org/repos/asf/nifi/blob/17a36c6f/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/heartbeat/TestAbstractHeartbeatMonitor.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/heartbeat/TestAbstractHeartbeatMonitor.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/heartbeat/TestAbstractHeartbeatMonitor.java
index e83999a..690cda8 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/heartbeat/TestAbstractHeartbeatMonitor.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/heartbeat/TestAbstractHeartbeatMonitor.java
@@ -36,6 +36,7 @@ import org.apache.nifi.cluster.coordination.ClusterCoordinator;
 import org.apache.nifi.cluster.coordination.node.DisconnectionCode;
 import org.apache.nifi.cluster.coordination.node.NodeConnectionState;
 import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus;
+import org.apache.nifi.cluster.coordination.node.NodeWorkload;
 import org.apache.nifi.cluster.event.NodeEvent;
 import org.apache.nifi.cluster.protocol.NodeIdentifier;
 import org.apache.nifi.reporting.Severity;
@@ -329,6 +330,11 @@ public class TestAbstractHeartbeatMonitor {
         public String getFlowElectionStatus() {
             return null;
         }
+
+        @Override
+        public Map<NodeIdentifier, NodeWorkload> getClusterWorkload() throws IOException {
+            return null;
+        }
     }
 
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/17a36c6f/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/node/TestNodeClusterCoordinator.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/node/TestNodeClusterCoordinator.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/node/TestNodeClusterCoordinator.java
index be9b862..e2577a9 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/node/TestNodeClusterCoordinator.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/node/TestNodeClusterCoordinator.java
@@ -80,7 +80,7 @@ public class TestNodeClusterCoordinator {
         final RevisionManager revisionManager = Mockito.mock(RevisionManager.class);
         Mockito.when(revisionManager.getAllRevisions()).thenReturn(Collections.emptyList());
 
-        coordinator = new NodeClusterCoordinator(senderListener, eventReporter, null, new FirstVoteWinsFlowElection(), null, revisionManager, createProperties()) {
+        coordinator = new NodeClusterCoordinator(senderListener, eventReporter, null, new FirstVoteWinsFlowElection(), null, revisionManager, createProperties(), null) {
             @Override
             void notifyOthersOfNodeStatusChange(NodeConnectionStatus updatedStatus, boolean notifyAllNodes, boolean waitForCoordinator) {
                 nodeStatuses.add(updatedStatus);
@@ -135,7 +135,8 @@ public class TestNodeClusterCoordinator {
         final RevisionManager revisionManager = Mockito.mock(RevisionManager.class);
         Mockito.when(revisionManager.getAllRevisions()).thenReturn(Collections.emptyList());
 
-        final NodeClusterCoordinator coordinator = new NodeClusterCoordinator(senderListener, eventReporter, null, new FirstVoteWinsFlowElection(), null, revisionManager, createProperties()) {
+        final NodeClusterCoordinator coordinator = new NodeClusterCoordinator(senderListener, eventReporter, null, new FirstVoteWinsFlowElection(),
+                null, revisionManager, createProperties(), null) {
             @Override
             void notifyOthersOfNodeStatusChange(NodeConnectionStatus updatedStatus, boolean notifyAllNodes, boolean waitForCoordinator) {
             }
@@ -175,7 +176,8 @@ public class TestNodeClusterCoordinator {
         final RevisionManager revisionManager = Mockito.mock(RevisionManager.class);
         Mockito.when(revisionManager.getAllRevisions()).thenReturn(Collections.emptyList());
 
-        final NodeClusterCoordinator coordinator = new NodeClusterCoordinator(senderListener, eventReporter, null, new FirstVoteWinsFlowElection(), null, revisionManager, createProperties()) {
+        final NodeClusterCoordinator coordinator = new NodeClusterCoordinator(senderListener, eventReporter, null, new FirstVoteWinsFlowElection(),
+                null, revisionManager, createProperties(), null) {
             @Override
             void notifyOthersOfNodeStatusChange(NodeConnectionStatus updatedStatus, boolean notifyAllNodes, boolean waitForCoordinator) {
             }

http://git-wip-us.apache.org/repos/asf/nifi/blob/17a36c6f/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/Node.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/Node.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/Node.java
index 93c9397..7c74680 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/Node.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/Node.java
@@ -277,7 +277,8 @@ public class Node {
         }
 
         final ClusterCoordinationProtocolSenderListener protocolSenderListener = new ClusterCoordinationProtocolSenderListener(createCoordinatorProtocolSender(), protocolListener);
-        return new NodeClusterCoordinator(protocolSenderListener, eventReporter, electionManager, flowElection, null, revisionManager, nodeProperties);
+        return new NodeClusterCoordinator(protocolSenderListener, eventReporter, electionManager, flowElection, null,
+                revisionManager, nodeProperties, protocolSender);
     }
 
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/17a36c6f/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/ClusterCoordinatorNodeInformant.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/ClusterCoordinatorNodeInformant.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/ClusterCoordinatorNodeInformant.java
index 9f8439c..95381af 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/ClusterCoordinatorNodeInformant.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/ClusterCoordinatorNodeInformant.java
@@ -17,12 +17,11 @@
 
 package org.apache.nifi.controller;
 
-import java.util.ArrayList;
+import java.io.IOException;
 import java.util.List;
-import java.util.Set;
+import java.util.stream.Collectors;
 
 import org.apache.nifi.cluster.coordination.ClusterCoordinator;
-import org.apache.nifi.cluster.coordination.node.NodeConnectionState;
 import org.apache.nifi.cluster.protocol.NodeIdentifier;
 import org.apache.nifi.remote.cluster.ClusterNodeInformation;
 import org.apache.nifi.remote.cluster.NodeInformant;
@@ -37,14 +36,16 @@ public class ClusterCoordinatorNodeInformant implements NodeInformant {
 
     @Override
     public ClusterNodeInformation getNodeInformation() {
-        final List<NodeInformation> nodeInfoCollection = new ArrayList<>();
-        final Set<NodeIdentifier> nodeIds = clusterCoordinator.getNodeIdentifiers(NodeConnectionState.CONNECTED);
-
-        // TODO: Get total number of FlowFiles for each node
-        for (final NodeIdentifier nodeId : nodeIds) {
-            final NodeInformation nodeInfo = new NodeInformation(nodeId.getSiteToSiteAddress(), nodeId.getSiteToSitePort(),
-                nodeId.getSiteToSiteHttpApiPort(), nodeId.getApiPort(), nodeId.isSiteToSiteSecure(), 0);
-            nodeInfoCollection.add(nodeInfo);
+        final List<NodeInformation> nodeInfoCollection;
+        try {
+            nodeInfoCollection = clusterCoordinator.getClusterWorkload().entrySet().stream().map(entry -> {
+                final NodeIdentifier nodeId = entry.getKey();
+                final NodeInformation nodeInfo = new NodeInformation(nodeId.getSiteToSiteAddress(), nodeId.getSiteToSitePort(),
+                        nodeId.getSiteToSiteHttpApiPort(), nodeId.getApiPort(), nodeId.isSiteToSiteSecure(), entry.getValue().getFlowFileCount());
+                return nodeInfo;
+            }).collect(Collectors.toList());
+        } catch (IOException e) {
+            throw new RuntimeException("Failed to retrieve cluster workload due to " + e, e);
         }
 
         final ClusterNodeInformation nodeInfo = new ClusterNodeInformation();

http://git-wip-us.apache.org/repos/asf/nifi/blob/17a36c6f/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/SiteToSiteResource.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/SiteToSiteResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/SiteToSiteResource.java
index 63e5a35..efb1c26 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/SiteToSiteResource.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/SiteToSiteResource.java
@@ -34,7 +34,7 @@ import org.apache.nifi.authorization.resource.ResourceFactory;
 import org.apache.nifi.authorization.user.NiFiUser;
 import org.apache.nifi.authorization.user.NiFiUserUtils;
 import org.apache.nifi.cluster.coordination.ClusterCoordinator;
-import org.apache.nifi.cluster.coordination.node.NodeConnectionState;
+import org.apache.nifi.cluster.coordination.node.NodeWorkload;
 import org.apache.nifi.cluster.protocol.NodeIdentifier;
 import org.apache.nifi.remote.HttpRemoteSiteListener;
 import org.apache.nifi.remote.VersionNegotiator;
@@ -59,13 +59,13 @@ import javax.ws.rs.Produces;
 import javax.ws.rs.core.Context;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
+import java.io.IOException;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 
 import static org.apache.commons.lang3.StringUtils.isEmpty;
 
@@ -216,18 +216,23 @@ public class SiteToSiteResource extends ApplicationResource {
 
         final List<PeerDTO> peers = new ArrayList<>();
         if (properties.isNode()) {
-            final Set<NodeIdentifier> nodeIds = clusterCoordinator.getNodeIdentifiers(NodeConnectionState.CONNECTED);
-
-            // TODO: Get total number of FlowFiles for each node
-            for (final NodeIdentifier nodeId : nodeIds) {
-                final PeerDTO peer = new PeerDTO();
-                final String siteToSiteAddress = nodeId.getSiteToSiteAddress();
-                peer.setHostname(siteToSiteAddress == null ? nodeId.getApiAddress() : siteToSiteAddress);
-                peer.setPort(nodeId.getSiteToSiteHttpApiPort() == null ? nodeId.getApiPort() : nodeId.getSiteToSiteHttpApiPort());
-                peer.setSecure(nodeId.isSiteToSiteSecure());
-                peer.setFlowFileCount(0);
-                peers.add(peer);
+
+            try {
+                final Map<NodeIdentifier, NodeWorkload> clusterWorkload = clusterCoordinator.getClusterWorkload();
+                clusterWorkload.entrySet().stream().forEach(entry -> {
+                    final PeerDTO peer = new PeerDTO();
+                    final NodeIdentifier nodeId = entry.getKey();
+                    final String siteToSiteAddress = nodeId.getSiteToSiteAddress();
+                    peer.setHostname(siteToSiteAddress == null ? nodeId.getApiAddress() : siteToSiteAddress);
+                    peer.setPort(nodeId.getSiteToSiteHttpApiPort() == null ? nodeId.getApiPort() : nodeId.getSiteToSiteHttpApiPort());
+                    peer.setSecure(nodeId.isSiteToSiteSecure());
+                    peer.setFlowFileCount(entry.getValue().getFlowFileCount());
+                    peers.add(peer);
+                });
+            } catch (IOException e) {
+                throw new RuntimeException("Failed to retrieve cluster workload due to " + e, e);
             }
+
         } else {
             // Standalone mode.
             final PeerDTO peer = new PeerDTO();

http://git-wip-us.apache.org/repos/asf/nifi/blob/17a36c6f/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/api/TestSiteToSiteResource.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/api/TestSiteToSiteResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/api/TestSiteToSiteResource.java
index 60a7ba9..5f3aef3 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/api/TestSiteToSiteResource.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/api/TestSiteToSiteResource.java
@@ -16,6 +16,9 @@
  */
 package org.apache.nifi.web.api;
 
+import org.apache.nifi.cluster.coordination.ClusterCoordinator;
+import org.apache.nifi.cluster.coordination.node.NodeWorkload;
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
 import org.apache.nifi.remote.protocol.ResponseCode;
 import org.apache.nifi.remote.protocol.http.HttpHeaders;
 import org.apache.nifi.util.NiFiProperties;
@@ -30,12 +33,17 @@ import org.junit.Test;
 import javax.servlet.http.HttpServletRequest;
 import javax.ws.rs.core.Response;
 import java.net.URL;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.IntStream;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 public class TestSiteToSiteResource {
 
@@ -116,6 +124,47 @@ public class TestSiteToSiteResource {
         assertEquals(1, resultEntity.getPeers().size());
     }
 
+    @Test
+    public void testPeersClustered() throws Exception {
+        final HttpServletRequest req = createCommonHttpServletRequest();
+
+        final NiFiServiceFacade serviceFacade = mock(NiFiServiceFacade.class);
+
+        final SiteToSiteResource resource = getSiteToSiteResourceClustered(serviceFacade);
+
+        final ClusterCoordinator clusterCoordinator = mock(ClusterCoordinator.class);
+        final Map<String, NodeWorkload> hostportWorkloads = new HashMap<>();
+        final Map<NodeIdentifier, NodeWorkload> workloads = new HashMap<>();
+        IntStream.range(1, 4).forEach(i -> {
+            final String hostname = "node" + i;
+            final int siteToSiteHttpApiPort = 8110 + i;
+            final NodeIdentifier nodeId = new NodeIdentifier(hostname, hostname, 8080 + i, hostname, 8090 + i, hostname, 8100 + i, siteToSiteHttpApiPort, false);
+            final NodeWorkload workload = new NodeWorkload();
+            workload.setReportedTimestamp(System.currentTimeMillis() - i);
+            workload.setFlowFileBytes(1024 * i);
+            workload.setFlowFileCount(10 * i);
+            workload.setActiveThreadCount(i);
+            workload.setSystemStartTime(System.currentTimeMillis() - (1000 * i));
+            workloads.put(nodeId, workload);
+            hostportWorkloads.put(hostname + ":" + siteToSiteHttpApiPort, workload);
+        });
+        when(clusterCoordinator.getClusterWorkload()).thenReturn(workloads);
+        resource.setClusterCoordinator(clusterCoordinator);
+
+        final Response response = resource.getPeers(req);
+
+        PeersEntity resultEntity = (PeersEntity) response.getEntity();
+
+        assertEquals(200, response.getStatus());
+        assertEquals(3, resultEntity.getPeers().size());
+        resultEntity.getPeers().stream().forEach(peerDTO -> {
+            final NodeWorkload workload = hostportWorkloads.get(peerDTO.getHostname() + ":" + peerDTO.getPort());
+            assertNotNull(workload);
+            assertEquals(workload.getFlowFileCount(), peerDTO.getFlowFileCount());
+        });
+
+    }
+
 
     @Test
     public void testPeersVersionWasNotSpecified() throws Exception {
@@ -160,4 +209,18 @@ public class TestSiteToSiteResource {
         resource.setServiceFacade(serviceFacade);
         return resource;
     }
+
+    private SiteToSiteResource getSiteToSiteResourceClustered(final NiFiServiceFacade serviceFacade) {
+        final Map<String, String> clusterSettings = new HashMap<>();
+        clusterSettings.put(NiFiProperties.CLUSTER_IS_NODE, "true");
+        final NiFiProperties properties = NiFiProperties.createBasicNiFiProperties(null, clusterSettings);
+        final SiteToSiteResource resource = new SiteToSiteResource(properties) {
+            @Override
+            protected void authorizeSiteToSite() {
+            }
+        };
+        resource.setProperties(properties);
+        resource.setServiceFacade(serviceFacade);
+        return resource;
+    }
 }
\ No newline at end of file