You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by mc...@apache.org on 2016/04/04 18:28:48 UTC

[12/18] nifi git commit: NIFI-1563: - Federate requests and merge responses from nodes instead of storing bulletins and stats at NCM - Updating UI to support restructured status history DTO. - Return 'Insufficient History' message if aggregate stats don'

http://git-wip-us.apache.org/repos/asf/nifi/blob/0d3bd2c4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/ConnectionStatusEntity.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/ConnectionStatusEntity.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/ConnectionStatusEntity.java
new file mode 100644
index 0000000..8b62331
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/ConnectionStatusEntity.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.web.api.entity;
+
+import org.apache.nifi.web.api.dto.status.ConnectionStatusDTO;
+
+import javax.xml.bind.annotation.XmlRootElement;
+
+/**
+ * A serialized representation of this class can be placed in the entity body of a request or response to or from the API. This particular entity holds a reference to a ConnectionStatusDTO.
+ */
+@XmlRootElement(name = "connectionStatusEntity")
+public class ConnectionStatusEntity extends Entity {
+
+    private ConnectionStatusDTO connectionStatus;
+
+    /**
+     * The ConnectionStatusDTO that is being serialized.
+     *
+     * @return The ConnectionStatusDTO object
+     */
+    public ConnectionStatusDTO getConnectionStatus() {
+        return connectionStatus;
+    }
+
+    public void setConnectionStatus(ConnectionStatusDTO connectionStatus) {
+        this.connectionStatus = connectionStatus;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/0d3bd2c4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/NodeSystemDiagnosticsEntity.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/NodeSystemDiagnosticsEntity.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/NodeSystemDiagnosticsEntity.java
deleted file mode 100644
index 443276c..0000000
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/NodeSystemDiagnosticsEntity.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.web.api.entity;
-
-import javax.xml.bind.annotation.XmlRootElement;
-import org.apache.nifi.web.api.dto.NodeSystemDiagnosticsDTO;
-
-/**
- * A serialized representation of this class can be placed in the entity body of a request or response to or from the API. This particular entity holds a reference to a NodeSystemDiagnosticsDTO.
- */
-@XmlRootElement(name = "nodeSystemDiagnosticsEntity")
-public class NodeSystemDiagnosticsEntity extends Entity {
-
-    private NodeSystemDiagnosticsDTO nodeSystemDiagnostics;
-
-    /**
-     * The NodeSystemDiagnosticsDTO that is being serialized.
-     *
-     * @return The NodeSystemDiagnosticsDTO object
-     */
-    public NodeSystemDiagnosticsDTO getNodeSystemDiagnostics() {
-        return nodeSystemDiagnostics;
-    }
-
-    public void setNodeSystemDiagnostics(NodeSystemDiagnosticsDTO nodeSystemDiagnostics) {
-        this.nodeSystemDiagnostics = nodeSystemDiagnostics;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/0d3bd2c4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/PortStatusEntity.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/PortStatusEntity.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/PortStatusEntity.java
new file mode 100644
index 0000000..e0b49c4
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/PortStatusEntity.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.web.api.entity;
+
+import org.apache.nifi.web.api.dto.status.PortStatusDTO;
+
+import javax.xml.bind.annotation.XmlRootElement;
+
+/**
+ * A serialized representation of this class can be placed in the entity body of a request or response to or from the API. This particular entity holds a reference to a PortStatusDTO.
+ */
+@XmlRootElement(name = "portStatusEntity")
+public class PortStatusEntity extends Entity {
+
+    private PortStatusDTO portStatus;
+
+    /**
+     * The PortStatusDTO that is being serialized.
+     *
+     * @return The PortStatusDTO object
+     */
+    public PortStatusDTO getPortStatus() {
+        return portStatus;
+    }
+
+    public void setPortStatus(PortStatusDTO portStatus) {
+        this.portStatus = portStatus;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/0d3bd2c4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/ProcessorStatusEntity.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/ProcessorStatusEntity.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/ProcessorStatusEntity.java
new file mode 100644
index 0000000..0c2170c
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/ProcessorStatusEntity.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.web.api.entity;
+
+import org.apache.nifi.web.api.dto.status.ProcessorStatusDTO;
+
+import javax.xml.bind.annotation.XmlRootElement;
+
+/**
+ * A serialized representation of this class can be placed in the entity body of a request or response to or from the API. This particular entity holds a reference to a ProcessorStatusDTO.
+ */
+@XmlRootElement(name = "processorStatusEntity")
+public class ProcessorStatusEntity extends Entity {
+
+    private ProcessorStatusDTO processorStatus;
+
+    /**
+     * The ProcessorStatusDTO that is being serialized.
+     *
+     * @return The ProcessorStatusDTO object
+     */
+    public ProcessorStatusDTO getProcessorStatus() {
+        return processorStatus;
+    }
+
+    public void setProcessorStatus(ProcessorStatusDTO processorStatus) {
+        this.processorStatus = processorStatus;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/0d3bd2c4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/RemoteProcessGroupStatusEntity.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/RemoteProcessGroupStatusEntity.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/RemoteProcessGroupStatusEntity.java
new file mode 100644
index 0000000..a5031ab
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/RemoteProcessGroupStatusEntity.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.web.api.entity;
+
+import org.apache.nifi.web.api.dto.status.RemoteProcessGroupStatusDTO;
+
+import javax.xml.bind.annotation.XmlRootElement;
+
+/**
+ * A serialized representation of this class can be placed in the entity body of a request or response to or from the API. This particular entity holds a reference to a RemoteProcessGroupStatusDTO.
+ */
+@XmlRootElement(name = "remoteProcessGroupStatusEntity")
+public class RemoteProcessGroupStatusEntity extends Entity {
+
+    private RemoteProcessGroupStatusDTO remoteProcessGroupStatus;
+
+    /**
+     * The RemoteProcessGroupStatusDTO that is being serialized.
+     *
+     * @return The RemoteProcessGroupStatusDTO object
+     */
+    public RemoteProcessGroupStatusDTO getRemoteProcessGroupStatus() {
+        return remoteProcessGroupStatus;
+    }
+
+    public void setRemoteProcessGroupStatus(RemoteProcessGroupStatusDTO remoteProcessGroupStatus) {
+        this.remoteProcessGroupStatus = remoteProcessGroupStatus;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/0d3bd2c4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/NodeProtocolSender.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/NodeProtocolSender.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/NodeProtocolSender.java
index f3e5df4..be0c339 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/NodeProtocolSender.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/NodeProtocolSender.java
@@ -20,7 +20,6 @@ import org.apache.nifi.cluster.protocol.message.ConnectionRequestMessage;
 import org.apache.nifi.cluster.protocol.message.ConnectionResponseMessage;
 import org.apache.nifi.cluster.protocol.message.ControllerStartupFailureMessage;
 import org.apache.nifi.cluster.protocol.message.HeartbeatMessage;
-import org.apache.nifi.cluster.protocol.message.NodeBulletinsMessage;
 import org.apache.nifi.cluster.protocol.message.ReconnectionFailureMessage;
 
 /**
@@ -52,15 +51,6 @@ public interface NodeProtocolSender {
     void heartbeat(HeartbeatMessage msg) throws ProtocolException, UnknownServiceAddressException;
 
     /**
-     * Sends a bulletins message to the cluster manager.
-     *
-     * @param msg a message
-     * @throws ProtocolException pe
-     * @throws UnknownServiceAddressException ex
-     */
-    void sendBulletins(NodeBulletinsMessage msg) throws ProtocolException, UnknownServiceAddressException;
-
-    /**
      * Sends a failure notification if the controller was unable start.
      *
      * @param msg a message

http://git-wip-us.apache.org/repos/asf/nifi/blob/0d3bd2c4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/NodeProtocolSenderImpl.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/NodeProtocolSenderImpl.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/NodeProtocolSenderImpl.java
index 993dea5..9ae6182 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/NodeProtocolSenderImpl.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/NodeProtocolSenderImpl.java
@@ -32,7 +32,6 @@ import org.apache.nifi.cluster.protocol.message.ConnectionRequestMessage;
 import org.apache.nifi.cluster.protocol.message.ConnectionResponseMessage;
 import org.apache.nifi.cluster.protocol.message.ControllerStartupFailureMessage;
 import org.apache.nifi.cluster.protocol.message.HeartbeatMessage;
-import org.apache.nifi.cluster.protocol.message.NodeBulletinsMessage;
 import org.apache.nifi.cluster.protocol.message.ProtocolMessage;
 import org.apache.nifi.cluster.protocol.message.ProtocolMessage.MessageType;
 import org.apache.nifi.cluster.protocol.message.ReconnectionFailureMessage;
@@ -117,11 +116,6 @@ public class NodeProtocolSenderImpl implements NodeProtocolSender {
     }
 
     @Override
-    public void sendBulletins(NodeBulletinsMessage msg) throws ProtocolException, UnknownServiceAddressException {
-        sendProtocolMessage(msg);
-    }
-
-    @Override
     public void notifyControllerStartupFailure(final ControllerStartupFailureMessage msg) throws ProtocolException, UnknownServiceAddressException {
         sendProtocolMessage(msg);
     }

http://git-wip-us.apache.org/repos/asf/nifi/blob/0d3bd2c4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/NodeProtocolSenderListener.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/NodeProtocolSenderListener.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/NodeProtocolSenderListener.java
index 2992e38..0a9a064 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/NodeProtocolSenderListener.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/NodeProtocolSenderListener.java
@@ -24,7 +24,6 @@ import org.apache.nifi.cluster.protocol.ProtocolException;
 import org.apache.nifi.cluster.protocol.ProtocolHandler;
 import org.apache.nifi.cluster.protocol.ProtocolListener;
 import org.apache.nifi.cluster.protocol.UnknownServiceAddressException;
-import org.apache.nifi.cluster.protocol.message.NodeBulletinsMessage;
 import org.apache.nifi.cluster.protocol.message.ConnectionRequestMessage;
 import org.apache.nifi.cluster.protocol.message.ConnectionResponseMessage;
 import org.apache.nifi.cluster.protocol.message.ControllerStartupFailureMessage;
@@ -104,11 +103,6 @@ public class NodeProtocolSenderListener implements NodeProtocolSender, ProtocolL
     }
 
     @Override
-    public void sendBulletins(NodeBulletinsMessage msg) throws ProtocolException, UnknownServiceAddressException {
-        sender.sendBulletins(msg);
-    }
-
-    @Override
     public void setBulletinRepository(final BulletinRepository bulletinRepository) {
         listener.setBulletinRepository(bulletinRepository);
     }

http://git-wip-us.apache.org/repos/asf/nifi/blob/0d3bd2c4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/ObjectFactory.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/ObjectFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/ObjectFactory.java
index f0a9fa7..516b67e 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/ObjectFactory.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/ObjectFactory.java
@@ -26,7 +26,6 @@ import org.apache.nifi.cluster.protocol.message.FlowRequestMessage;
 import org.apache.nifi.cluster.protocol.message.FlowResponseMessage;
 import org.apache.nifi.cluster.protocol.message.HeartbeatMessage;
 import org.apache.nifi.cluster.protocol.message.MulticastProtocolMessage;
-import org.apache.nifi.cluster.protocol.message.NodeBulletinsMessage;
 import org.apache.nifi.cluster.protocol.message.PingMessage;
 import org.apache.nifi.cluster.protocol.message.PrimaryRoleAssignmentMessage;
 import org.apache.nifi.cluster.protocol.message.ReconnectionFailureMessage;
@@ -97,8 +96,4 @@ public class ObjectFactory {
     public PrimaryRoleAssignmentMessage createPrimaryRoleAssignmentMessage() {
         return new PrimaryRoleAssignmentMessage();
     }
-
-    public NodeBulletinsMessage createBulletinsMessage() {
-        return new NodeBulletinsMessage();
-    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/0d3bd2c4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/NodeBulletinsMessage.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/NodeBulletinsMessage.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/NodeBulletinsMessage.java
deleted file mode 100644
index 6df3ba4..0000000
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/NodeBulletinsMessage.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.cluster.protocol.message;
-
-import org.apache.nifi.cluster.protocol.NodeBulletins;
-import javax.xml.bind.annotation.XmlRootElement;
-
-/**
- */
-@XmlRootElement(name = "nodeBulletinsMessage")
-public class NodeBulletinsMessage extends ProtocolMessage {
-
-    private NodeBulletins bulletins;
-
-    @Override
-    public MessageType getType() {
-        return MessageType.BULLETINS;
-    }
-
-    public NodeBulletins getBulletins() {
-        return bulletins;
-    }
-
-    public void setBulletins(NodeBulletins bulletins) {
-        this.bulletins = bulletins;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/0d3bd2c4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ProtocolMessage.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ProtocolMessage.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ProtocolMessage.java
index c6f7ce0..f01efd8 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ProtocolMessage.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ProtocolMessage.java
@@ -21,8 +21,6 @@ public abstract class ProtocolMessage {
     private volatile String requestorDN;
 
     public static enum MessageType {
-
-        BULLETINS,
         CONNECTION_REQUEST,
         CONNECTION_RESPONSE,
         CONTROLLER_STARTUP_FAILURE,

http://git-wip-us.apache.org/repos/asf/nifi/blob/0d3bd2c4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ClusterManager.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ClusterManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ClusterManager.java
index 336d675..51de54b 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ClusterManager.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ClusterManager.java
@@ -33,8 +33,6 @@ import org.apache.nifi.cluster.protocol.ConnectionRequest;
 import org.apache.nifi.cluster.protocol.ConnectionResponse;
 import org.apache.nifi.cluster.protocol.Heartbeat;
 import org.apache.nifi.cluster.protocol.NodeIdentifier;
-import org.apache.nifi.controller.status.ProcessGroupStatus;
-import org.apache.nifi.diagnostics.SystemDiagnostics;
 import org.apache.nifi.remote.cluster.NodeInformant;
 import org.apache.nifi.reporting.BulletinRepository;
 
@@ -168,15 +166,4 @@ public interface ClusterManager extends NodeInformant {
      * @return the bulletin repository
      */
     BulletinRepository getBulletinRepository();
-
-    /**
-     * @param groupId groupId
-     * @return a {@link ProcessGroupStatus} that represents the status of all nodes with the given {@link Status}es for the given ProcessGroup id, or null if no nodes exist with the given statuses
-     */
-    ProcessGroupStatus getProcessGroupStatus(String groupId);
-
-    /**
-     * @return a merged representation of the System Diagnostics for all nodes in the cluster
-     */
-    SystemDiagnostics getSystemDiagnostics();
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/0d3bd2c4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/StatusMerger.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/StatusMerger.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/StatusMerger.java
new file mode 100644
index 0000000..66ad494
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/StatusMerger.java
@@ -0,0 +1,646 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.cluster.manager;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.nifi.controller.status.RunStatus;
+import org.apache.nifi.controller.status.TransmissionStatus;
+import org.apache.nifi.util.FormatUtils;
+import org.apache.nifi.web.api.dto.BulletinDTO;
+import org.apache.nifi.web.api.dto.CounterDTO;
+import org.apache.nifi.web.api.dto.CountersDTO;
+import org.apache.nifi.web.api.dto.CountersSnapshotDTO;
+import org.apache.nifi.web.api.dto.NodeCountersSnapshotDTO;
+import org.apache.nifi.web.api.dto.NodeSystemDiagnosticsSnapshotDTO;
+import org.apache.nifi.web.api.dto.SystemDiagnosticsDTO;
+import org.apache.nifi.web.api.dto.SystemDiagnosticsSnapshotDTO;
+import org.apache.nifi.web.api.dto.SystemDiagnosticsSnapshotDTO.GarbageCollectionDTO;
+import org.apache.nifi.web.api.dto.SystemDiagnosticsSnapshotDTO.StorageUsageDTO;
+import org.apache.nifi.web.api.dto.status.ConnectionStatusDTO;
+import org.apache.nifi.web.api.dto.status.ConnectionStatusSnapshotDTO;
+import org.apache.nifi.web.api.dto.status.ControllerStatusDTO;
+import org.apache.nifi.web.api.dto.status.NodeConnectionStatusSnapshotDTO;
+import org.apache.nifi.web.api.dto.status.NodePortStatusSnapshotDTO;
+import org.apache.nifi.web.api.dto.status.NodeProcessGroupStatusSnapshotDTO;
+import org.apache.nifi.web.api.dto.status.NodeProcessorStatusSnapshotDTO;
+import org.apache.nifi.web.api.dto.status.NodeRemoteProcessGroupStatusSnapshotDTO;
+import org.apache.nifi.web.api.dto.status.PortStatusDTO;
+import org.apache.nifi.web.api.dto.status.PortStatusSnapshotDTO;
+import org.apache.nifi.web.api.dto.status.ProcessGroupStatusDTO;
+import org.apache.nifi.web.api.dto.status.ProcessGroupStatusSnapshotDTO;
+import org.apache.nifi.web.api.dto.status.ProcessorStatusDTO;
+import org.apache.nifi.web.api.dto.status.ProcessorStatusSnapshotDTO;
+import org.apache.nifi.web.api.dto.status.RemoteProcessGroupStatusDTO;
+import org.apache.nifi.web.api.dto.status.RemoteProcessGroupStatusSnapshotDTO;
+
+public class StatusMerger {
+    public static void merge(final ControllerStatusDTO target, final ControllerStatusDTO toMerge) {
+        if (target == null || toMerge == null) {
+            return;
+        }
+
+        target.setActiveRemotePortCount(target.getActiveRemotePortCount() + toMerge.getActiveRemotePortCount());
+        target.setActiveThreadCount(target.getActiveThreadCount() + toMerge.getActiveThreadCount());
+        target.setBytesQueued(target.getBytesQueued() + toMerge.getBytesQueued());
+        target.setDisabledCount(target.getDisabledCount() + toMerge.getDisabledCount());
+        target.setFlowFilesQueued(target.getFlowFilesQueued() + toMerge.getFlowFilesQueued());
+        target.setInactiveRemotePortCount(target.getInactiveRemotePortCount() + toMerge.getInactiveRemotePortCount());
+        target.setInvalidCount(target.getInvalidCount() + toMerge.getInvalidCount());
+        target.setRunningCount(target.getRunningCount() + toMerge.getRunningCount());
+        target.setStoppedCount(target.getStoppedCount() + toMerge.getStoppedCount());
+
+        target.setBulletins(mergeBulletins(target.getBulletins(), toMerge.getBulletins()));
+        target.setControllerServiceBulletins(mergeBulletins(target.getControllerServiceBulletins(), toMerge.getControllerServiceBulletins()));
+        target.setReportingTaskBulletins(mergeBulletins(target.getReportingTaskBulletins(), toMerge.getReportingTaskBulletins()));
+
+        updatePrettyPrintedFields(target);
+    }
+
+    public static void updatePrettyPrintedFields(final ControllerStatusDTO target) {
+        target.setQueued(prettyPrint(target.getFlowFilesQueued(), target.getBytesQueued()));
+        target.setConnectedNodes(formatCount(target.getConnectedNodeCount()) + " / " + formatCount(target.getTotalNodeCount()));
+    }
+
+    public static List<BulletinDTO> mergeBulletins(final List<BulletinDTO> targetBulletins, final List<BulletinDTO> toMerge) {
+        final List<BulletinDTO> bulletins = new ArrayList<>();
+        if (targetBulletins != null) {
+            bulletins.addAll(targetBulletins);
+        }
+
+        if (toMerge != null) {
+            bulletins.addAll(toMerge);
+        }
+
+        return bulletins;
+    }
+
+
+    public static void merge(final ProcessGroupStatusDTO target, final ProcessGroupStatusDTO toMerge, final String nodeId, final String nodeAddress, final Integer nodeApiPort) {
+        merge(target.getAggregateSnapshot(), toMerge.getAggregateSnapshot());
+
+        if (target.getNodeSnapshots() != null) {
+            final NodeProcessGroupStatusSnapshotDTO nodeSnapshot = new NodeProcessGroupStatusSnapshotDTO();
+            nodeSnapshot.setStatusSnapshot(toMerge.getAggregateSnapshot());
+            nodeSnapshot.setAddress(nodeAddress);
+            nodeSnapshot.setApiPort(nodeApiPort);
+            nodeSnapshot.setNodeId(nodeId);
+
+            target.getNodeSnapshots().add(nodeSnapshot);
+        }
+    }
+
+    public static void merge(final ProcessGroupStatusSnapshotDTO target, final ProcessGroupStatusSnapshotDTO toMerge) {
+        if (target == null || toMerge == null) {
+            return;
+        }
+
+        target.setBytesIn(target.getBytesIn() + toMerge.getBytesIn());
+        target.setFlowFilesIn(target.getFlowFilesIn() + toMerge.getFlowFilesIn());
+
+        target.setBytesQueued(target.getBytesQueued() + toMerge.getBytesQueued());
+        target.setFlowFilesQueued(target.getFlowFilesQueued() + toMerge.getFlowFilesQueued());
+
+        target.setBytesRead(target.getBytesRead() + toMerge.getBytesRead());
+        target.setBytesWritten(target.getBytesWritten() + toMerge.getBytesWritten());
+
+        target.setBytesOut(target.getBytesOut() + toMerge.getBytesOut());
+        target.setFlowFilesOut(target.getFlowFilesOut() + toMerge.getFlowFilesOut());
+
+        target.setBytesTransferred(target.getBytesTransferred() + toMerge.getBytesTransferred());
+        target.setFlowFilesTransferred(target.getFlowFilesTransferred() + toMerge.getFlowFilesTransferred());
+
+        target.setBytesReceived(target.getBytesReceived() + toMerge.getBytesReceived());
+        target.setFlowFilesReceived(target.getFlowFilesReceived() + toMerge.getFlowFilesReceived());
+
+        target.setBytesSent(target.getBytesSent() + toMerge.getBytesSent());
+        target.setFlowFilesSent(target.getFlowFilesSent() + toMerge.getFlowFilesSent());
+
+        target.setActiveThreadCount(target.getActiveThreadCount() + toMerge.getActiveThreadCount());
+        updatePrettyPrintedFields(target);
+
+        // connection status
+        // sort by id
+        final Map<String, ConnectionStatusSnapshotDTO> mergedConnectionMap = new HashMap<>();
+        for (final ConnectionStatusSnapshotDTO status : replaceNull(target.getConnectionStatusSnapshots())) {
+            mergedConnectionMap.put(status.getId(), status);
+        }
+
+        for (final ConnectionStatusSnapshotDTO statusToMerge : replaceNull(toMerge.getConnectionStatusSnapshots())) {
+            ConnectionStatusSnapshotDTO merged = mergedConnectionMap.get(statusToMerge.getId());
+            if (merged == null) {
+                mergedConnectionMap.put(statusToMerge.getId(), statusToMerge.clone());
+                continue;
+            }
+
+            merge(merged, statusToMerge);
+        }
+        target.setConnectionStatusSnapshots(mergedConnectionMap.values());
+
+        // processor status
+        final Map<String, ProcessorStatusSnapshotDTO> mergedProcessorMap = new HashMap<>();
+        for (final ProcessorStatusSnapshotDTO status : replaceNull(target.getProcessorStatusSnapshots())) {
+            mergedProcessorMap.put(status.getId(), status);
+        }
+
+        for (final ProcessorStatusSnapshotDTO statusToMerge : replaceNull(toMerge.getProcessorStatusSnapshots())) {
+            ProcessorStatusSnapshotDTO merged = mergedProcessorMap.get(statusToMerge.getId());
+            if (merged == null) {
+                mergedProcessorMap.put(statusToMerge.getId(), statusToMerge.clone());
+                continue;
+            }
+
+            merge(merged, statusToMerge);
+        }
+        target.setProcessorStatusSnapshots(mergedProcessorMap.values());
+
+
+        // input ports
+        final Map<String, PortStatusSnapshotDTO> mergedInputPortMap = new HashMap<>();
+        for (final PortStatusSnapshotDTO status : replaceNull(target.getInputPortStatusSnapshots())) {
+            mergedInputPortMap.put(status.getId(), status);
+        }
+
+        for (final PortStatusSnapshotDTO statusToMerge : replaceNull(toMerge.getInputPortStatusSnapshots())) {
+            PortStatusSnapshotDTO merged = mergedInputPortMap.get(statusToMerge.getId());
+            if (merged == null) {
+                mergedInputPortMap.put(statusToMerge.getId(), statusToMerge.clone());
+                continue;
+            }
+
+            merge(merged, statusToMerge);
+        }
+        target.setInputPortStatusSnapshots(mergedInputPortMap.values());
+
+        // output ports
+        final Map<String, PortStatusSnapshotDTO> mergedOutputPortMap = new HashMap<>();
+        for (final PortStatusSnapshotDTO status : replaceNull(target.getOutputPortStatusSnapshots())) {
+            mergedOutputPortMap.put(status.getId(), status);
+        }
+
+        for (final PortStatusSnapshotDTO statusToMerge : replaceNull(toMerge.getOutputPortStatusSnapshots())) {
+            PortStatusSnapshotDTO merged = mergedOutputPortMap.get(statusToMerge.getId());
+            if (merged == null) {
+                mergedOutputPortMap.put(statusToMerge.getId(), statusToMerge.clone());
+                continue;
+            }
+
+            merge(merged, statusToMerge);
+        }
+        target.setOutputPortStatusSnapshots(mergedOutputPortMap.values());
+
+        // child groups
+        final Map<String, ProcessGroupStatusSnapshotDTO> mergedGroupMap = new HashMap<>();
+        for (final ProcessGroupStatusSnapshotDTO status : replaceNull(target.getProcessGroupStatusSnapshots())) {
+            mergedGroupMap.put(status.getId(), status);
+        }
+
+        for (final ProcessGroupStatusSnapshotDTO statusToMerge : replaceNull(toMerge.getProcessGroupStatusSnapshots())) {
+            ProcessGroupStatusSnapshotDTO merged = mergedGroupMap.get(statusToMerge.getId());
+            if (merged == null) {
+                mergedGroupMap.put(statusToMerge.getId(), statusToMerge.clone());
+                continue;
+            }
+
+            merge(merged, statusToMerge);
+        }
+        target.setOutputPortStatusSnapshots(mergedOutputPortMap.values());
+
+        // remote groups
+        final Map<String, RemoteProcessGroupStatusSnapshotDTO> mergedRemoteGroupMap = new HashMap<>();
+        for (final RemoteProcessGroupStatusSnapshotDTO status : replaceNull(target.getRemoteProcessGroupStatusSnapshots())) {
+            mergedRemoteGroupMap.put(status.getId(), status);
+        }
+
+        for (final RemoteProcessGroupStatusSnapshotDTO statusToMerge : replaceNull(toMerge.getRemoteProcessGroupStatusSnapshots())) {
+            RemoteProcessGroupStatusSnapshotDTO merged = mergedRemoteGroupMap.get(statusToMerge.getId());
+            if (merged == null) {
+                mergedRemoteGroupMap.put(statusToMerge.getId(), statusToMerge.clone());
+                continue;
+            }
+
+            merge(merged, statusToMerge);
+        }
+        target.setRemoteProcessGroupStatusSnapshots(mergedRemoteGroupMap.values());
+    }
+
+    private static <T> Collection<T> replaceNull(final Collection<T> collection) {
+        return (collection == null) ? Collections.<T> emptyList() : collection;
+    }
+
+
+    /**
+     * Updates the fields that are "pretty printed" based on the raw values currently set. For example,
+     * {@link ProcessGroupStatusSnapshotDTO#setInput(String)} will be called with the pretty-printed form of the
+     * FlowFile counts and sizes retrieved via {@link ProcessGroupStatusSnapshotDTO#getFlowFilesIn()} and
+     * {@link ProcessGroupStatusSnapshotDTO#getBytesIn()}.
+     *
+     * This logic is performed here, rather than in the DTO itself because the DTO needs to be kept purely
+     * getters & setters - otherwise the automatic marshalling and unmarshalling to/from JSON becomes very
+     * complicated.
+     *
+     * @param target the DTO to update
+     */
+    public static void updatePrettyPrintedFields(final ProcessGroupStatusSnapshotDTO target) {
+        target.setQueued(prettyPrint(target.getFlowFilesQueued(), target.getBytesQueued()));
+        target.setQueuedCount(formatCount(target.getFlowFilesQueued()));
+        target.setQueuedSize(formatDataSize(target.getBytesQueued()));
+        target.setInput(prettyPrint(target.getFlowFilesIn(), target.getBytesIn()));
+        target.setRead(formatDataSize(target.getBytesRead()));
+        target.setWritten(formatDataSize(target.getBytesWritten()));
+        target.setOutput(prettyPrint(target.getFlowFilesOut(), target.getBytesOut()));
+        target.setTransferred(prettyPrint(target.getFlowFilesTransferred(), target.getBytesTransferred()));
+        target.setReceived(prettyPrint(target.getFlowFilesReceived(), target.getBytesReceived()));
+        target.setSent(prettyPrint(target.getFlowFilesSent(), target.getBytesSent()));
+    }
+
+    public static void merge(final RemoteProcessGroupStatusDTO target, final RemoteProcessGroupStatusDTO toMerge, final String nodeId, final String nodeAddress, final Integer nodeApiPort) {
+        merge(target.getAggregateSnapshot(), toMerge.getAggregateSnapshot());
+
+        if (target.getNodeSnapshots() != null) {
+            final NodeRemoteProcessGroupStatusSnapshotDTO nodeSnapshot = new NodeRemoteProcessGroupStatusSnapshotDTO();
+            nodeSnapshot.setStatusSnapshot(toMerge.getAggregateSnapshot());
+            nodeSnapshot.setAddress(nodeAddress);
+            nodeSnapshot.setApiPort(nodeApiPort);
+            nodeSnapshot.setNodeId(nodeId);
+
+            target.getNodeSnapshots().add(nodeSnapshot);
+        }
+    }
+
+    public static void merge(final PortStatusDTO target, final PortStatusDTO toMerge, final String nodeId, final String nodeAddress, final Integer nodeApiPort) {
+        merge(target.getAggregateSnapshot(), toMerge.getAggregateSnapshot());
+
+        if (target.getNodeSnapshots() != null) {
+            final NodePortStatusSnapshotDTO nodeSnapshot = new NodePortStatusSnapshotDTO();
+            nodeSnapshot.setStatusSnapshot(toMerge.getAggregateSnapshot());
+            nodeSnapshot.setAddress(nodeAddress);
+            nodeSnapshot.setApiPort(nodeApiPort);
+            nodeSnapshot.setNodeId(nodeId);
+
+            target.getNodeSnapshots().add(nodeSnapshot);
+        }
+    }
+
+    public static void merge(final ConnectionStatusDTO target, final ConnectionStatusDTO toMerge, final String nodeId, final String nodeAddress, final Integer nodeApiPort) {
+        merge(target.getAggregateSnapshot(), toMerge.getAggregateSnapshot());
+
+        if (target.getNodeSnapshots() != null) {
+            final NodeConnectionStatusSnapshotDTO nodeSnapshot = new NodeConnectionStatusSnapshotDTO();
+            nodeSnapshot.setStatusSnapshot(toMerge.getAggregateSnapshot());
+            nodeSnapshot.setAddress(nodeAddress);
+            nodeSnapshot.setApiPort(nodeApiPort);
+            nodeSnapshot.setNodeId(nodeId);
+
+            target.getNodeSnapshots().add(nodeSnapshot);
+        }
+    }
+
+    public static void merge(final ProcessorStatusDTO target, final ProcessorStatusDTO toMerge, final String nodeId, final String nodeAddress, final Integer nodeApiPort) {
+        merge(target.getAggregateSnapshot(), toMerge.getAggregateSnapshot());
+
+        if (target.getNodeSnapshots() != null) {
+            final NodeProcessorStatusSnapshotDTO nodeSnapshot = new NodeProcessorStatusSnapshotDTO();
+            nodeSnapshot.setStatusSnapshot(toMerge.getAggregateSnapshot());
+            nodeSnapshot.setAddress(nodeAddress);
+            nodeSnapshot.setApiPort(nodeApiPort);
+            nodeSnapshot.setNodeId(nodeId);
+
+            target.getNodeSnapshots().add(nodeSnapshot);
+        }
+    }
+
+    public static void merge(final ProcessorStatusSnapshotDTO target, final ProcessorStatusSnapshotDTO toMerge) {
+        if (target == null || toMerge == null) {
+            return;
+        }
+
+        // if the status to merge is invalid allow it to take precedence. whether the
+        // processor run status is disabled/stopped/running is part of the flow configuration
+        // and should not differ amongst nodes. however, whether a processor is invalid
+        // can be driven by environmental conditions. this check allows any of those to
+        // take precedence over the configured run status.
+        if (RunStatus.Invalid.name().equals(toMerge.getRunStatus())) {
+            target.setRunStatus(RunStatus.Invalid.name());
+        }
+
+        target.setBytesRead(target.getBytesRead() + toMerge.getBytesRead());
+        target.setBytesWritten(target.getBytesWritten() + toMerge.getBytesWritten());
+        target.setFlowFilesIn(target.getFlowFilesIn() + toMerge.getFlowFilesIn());
+        target.setBytesIn(target.getBytesIn() + toMerge.getBytesIn());
+        target.setFlowFilesOut(target.getFlowFilesOut() + toMerge.getFlowFilesOut());
+        target.setBytesOut(target.getBytesOut() + toMerge.getBytesOut());
+        target.setTaskCount(target.getTaskCount() + toMerge.getTaskCount());
+        target.setTasksDurationNanos(target.getTasksDurationNanos() + toMerge.getTasksDurationNanos());
+        target.setActiveThreadCount(target.getActiveThreadCount() + toMerge.getActiveThreadCount());
+        updatePrettyPrintedFields(target);
+    }
+
+    public static void updatePrettyPrintedFields(final ProcessorStatusSnapshotDTO target) {
+        target.setInput(prettyPrint(target.getFlowFilesIn(), target.getBytesIn()));
+        target.setRead(formatDataSize(target.getBytesRead()));
+        target.setWritten(formatDataSize(target.getBytesWritten()));
+        target.setOutput(prettyPrint(target.getFlowFilesOut(), target.getBytesOut()));
+
+        final Integer taskCount = target.getTaskCount();
+        final String tasks = (taskCount == null) ? "-" : formatCount(taskCount);
+        target.setTasks(tasks);
+
+        target.setTasksDuration(FormatUtils.formatHoursMinutesSeconds(target.getTasksDurationNanos(), TimeUnit.NANOSECONDS));
+    }
+
+
+    public static void merge(final ConnectionStatusSnapshotDTO target, final ConnectionStatusSnapshotDTO toMerge) {
+        if (target == null || toMerge == null) {
+            return;
+        }
+
+        target.setFlowFilesIn(target.getFlowFilesIn() + toMerge.getFlowFilesIn());
+        target.setBytesIn(target.getBytesIn() + toMerge.getBytesIn());
+        target.setFlowFilesOut(target.getFlowFilesOut() + toMerge.getFlowFilesOut());
+        target.setBytesOut(target.getBytesOut() + toMerge.getBytesOut());
+        target.setFlowFilesQueued(target.getFlowFilesQueued() + toMerge.getFlowFilesQueued());
+        target.setBytesQueued(target.getBytesQueued() + toMerge.getBytesQueued());
+        updatePrettyPrintedFields(target);
+    }
+
+    public static void updatePrettyPrintedFields(final ConnectionStatusSnapshotDTO target) {
+        target.setQueued(prettyPrint(target.getFlowFilesQueued(), target.getBytesQueued()));
+        target.setQueuedCount(formatCount(target.getFlowFilesQueued()));
+        target.setQueuedSize(formatDataSize(target.getBytesQueued()));
+        target.setInput(prettyPrint(target.getFlowFilesIn(), target.getBytesIn()));
+        target.setOutput(prettyPrint(target.getFlowFilesOut(), target.getBytesOut()));
+    }
+
+
+
+    public static void merge(final RemoteProcessGroupStatusSnapshotDTO target, final RemoteProcessGroupStatusSnapshotDTO toMerge) {
+        final String transmittingValue = TransmissionStatus.Transmitting.name();
+        if (transmittingValue.equals(target.getTransmissionStatus()) || transmittingValue.equals(toMerge.getTransmissionStatus())) {
+            target.setTransmissionStatus(transmittingValue);
+        }
+
+        target.setActiveThreadCount(target.getActiveThreadCount() + toMerge.getActiveThreadCount());
+
+        final List<String> authIssues = new ArrayList<>();
+        if (target.getAuthorizationIssues() != null) {
+            authIssues.addAll(target.getAuthorizationIssues());
+        }
+        if (toMerge.getAuthorizationIssues() != null) {
+            authIssues.addAll(toMerge.getAuthorizationIssues());
+        }
+        target.setAuthorizationIssues(authIssues);
+
+        target.setFlowFilesSent(target.getFlowFilesSent() + toMerge.getFlowFilesSent());
+        target.setBytesSent(target.getBytesSent() + toMerge.getBytesSent());
+        target.setFlowFilesReceived(target.getFlowFilesReceived() + toMerge.getFlowFilesReceived());
+        target.setBytesReceived(target.getBytesReceived() + toMerge.getBytesReceived());
+        updatePrettyPrintedFields(target);
+    }
+
+    public static void updatePrettyPrintedFields(final RemoteProcessGroupStatusSnapshotDTO target) {
+        target.setReceived(prettyPrint(target.getFlowFilesReceived(), target.getBytesReceived()));
+        target.setSent(prettyPrint(target.getFlowFilesSent(), target.getBytesSent()));
+    }
+
+
+
+    public static void merge(final PortStatusSnapshotDTO target, final PortStatusSnapshotDTO toMerge) {
+        if (target == null || toMerge == null) {
+            return;
+        }
+
+        target.setActiveThreadCount(target.getActiveThreadCount() + toMerge.getActiveThreadCount());
+        target.setFlowFilesIn(target.getFlowFilesIn() + toMerge.getFlowFilesIn());
+        target.setBytesIn(target.getBytesIn() + toMerge.getBytesIn());
+        target.setFlowFilesOut(target.getFlowFilesOut() + toMerge.getFlowFilesOut());
+        target.setBytesOut(target.getBytesOut() + toMerge.getBytesOut());
+        target.setTransmitting(Boolean.TRUE.equals(target.isTransmitting()) || Boolean.TRUE.equals(toMerge.isTransmitting()));
+
+        // should be unnecessary here since ports run status not should be affected by
+        // environmental conditions but doing so in case that changes
+        if (RunStatus.Invalid.name().equals(toMerge.getRunStatus())) {
+            target.setRunStatus(RunStatus.Invalid.name());
+        }
+
+        updatePrettyPrintedFields(target);
+    }
+
+    public static void updatePrettyPrintedFields(final PortStatusSnapshotDTO target) {
+        target.setInput(prettyPrint(target.getFlowFilesIn(), target.getBytesIn()));
+        target.setOutput(prettyPrint(target.getFlowFilesOut(), target.getBytesOut()));
+    }
+
+
+    public static void merge(final SystemDiagnosticsDTO target, final SystemDiagnosticsDTO toMerge, final String nodeId, final String nodeAddress, final Integer nodeApiPort) {
+        merge(target.getAggregateSnapshot(), toMerge.getAggregateSnapshot());
+
+        List<NodeSystemDiagnosticsSnapshotDTO> nodeSnapshots = target.getNodeSnapshots();
+        if (nodeSnapshots == null) {
+            nodeSnapshots = new ArrayList<>();
+        }
+
+        final NodeSystemDiagnosticsSnapshotDTO nodeSnapshot = new NodeSystemDiagnosticsSnapshotDTO();
+        nodeSnapshot.setAddress(nodeAddress);
+        nodeSnapshot.setApiPort(nodeApiPort);
+        nodeSnapshot.setNodeId(nodeId);
+        nodeSnapshot.setSnapshot(toMerge.getAggregateSnapshot());
+
+        nodeSnapshots.add(nodeSnapshot);
+        target.setNodeSnapshots(nodeSnapshots);
+    }
+
+    public static void merge(final SystemDiagnosticsSnapshotDTO target, final SystemDiagnosticsSnapshotDTO toMerge) {
+        if (target == null || toMerge == null) {
+            return;
+        }
+
+        target.setAvailableProcessors(target.getAvailableProcessors() + toMerge.getAvailableProcessors());
+        target.setDaemonThreads(target.getDaemonThreads() + toMerge.getDaemonThreads());
+        target.setFreeHeapBytes(target.getFreeHeapBytes() + toMerge.getFreeHeapBytes());
+        target.setFreeNonHeapBytes(target.getFreeNonHeapBytes() + toMerge.getFreeNonHeapBytes());
+        target.setMaxHeapBytes(target.getMaxHeapBytes() + toMerge.getMaxHeapBytes());
+        target.setMaxNonHeapBytes(target.getMaxNonHeapBytes() + toMerge.getMaxNonHeapBytes());
+        target.setProcessorLoadAverage(target.getProcessorLoadAverage() + toMerge.getProcessorLoadAverage());
+        target.setTotalHeapBytes(target.getTotalHeapBytes() + toMerge.getTotalHeapBytes());
+        target.setTotalNonHeapBytes(target.getTotalNonHeapBytes() + toMerge.getTotalNonHeapBytes());
+        target.setTotalThreads(target.getTotalThreads() + toMerge.getTotalThreads());
+        target.setUsedHeapBytes(target.getUsedHeapBytes() + toMerge.getUsedHeapBytes());
+        target.setUsedNonHeapBytes(target.getUsedNonHeapBytes() + toMerge.getUsedNonHeapBytes());
+
+        merge(target.getContentRepositoryStorageUsage(), toMerge.getContentRepositoryStorageUsage());
+        merge(target.getFlowFileRepositoryStorageUsage(), toMerge.getFlowFileRepositoryStorageUsage());
+        mergeGarbageCollection(target.getGarbageCollection(), toMerge.getGarbageCollection());
+
+        updatePrettyPrintedFields(target);
+    }
+
+    public static void updatePrettyPrintedFields(final SystemDiagnosticsSnapshotDTO target) {
+        // heap
+        target.setMaxHeap(FormatUtils.formatDataSize(target.getMaxHeapBytes()));
+        target.setTotalHeap(FormatUtils.formatDataSize(target.getTotalHeapBytes()));
+        target.setUsedHeap(FormatUtils.formatDataSize(target.getUsedHeapBytes()));
+        target.setFreeHeap(FormatUtils.formatDataSize(target.getFreeHeapBytes()));
+        if (target.getMaxHeapBytes() != -1) {
+            target.setHeapUtilization(FormatUtils.formatUtilization(getUtilization(target.getUsedHeapBytes(), target.getMaxHeapBytes())));
+        }
+
+        // non heap
+        target.setMaxNonHeap(FormatUtils.formatDataSize(target.getMaxNonHeapBytes()));
+        target.setTotalNonHeap(FormatUtils.formatDataSize(target.getTotalNonHeapBytes()));
+        target.setUsedNonHeap(FormatUtils.formatDataSize(target.getUsedNonHeapBytes()));
+        target.setFreeNonHeap(FormatUtils.formatDataSize(target.getFreeNonHeapBytes()));
+        if (target.getMaxNonHeapBytes() != -1) {
+            target.setNonHeapUtilization(FormatUtils.formatUtilization(getUtilization(target.getUsedNonHeapBytes(), target.getMaxNonHeapBytes())));
+        }
+    }
+
+    public static void merge(final Set<StorageUsageDTO> targetSet, final Set<StorageUsageDTO> toMerge) {
+        final Map<String, StorageUsageDTO> storageById = new HashMap<>();
+        for (final StorageUsageDTO targetUsage : targetSet) {
+            storageById.put(targetUsage.getIdentifier(), targetUsage);
+        }
+
+        for (final StorageUsageDTO usageToMerge : toMerge) {
+            final StorageUsageDTO targetUsage = storageById.get(usageToMerge.getIdentifier());
+            if (targetUsage == null) {
+                storageById.put(usageToMerge.getIdentifier(), usageToMerge);
+            } else {
+                merge(targetUsage, usageToMerge);
+            }
+        }
+
+        targetSet.clear();
+        targetSet.addAll(storageById.values());
+    }
+
+    public static void merge(final StorageUsageDTO target, final StorageUsageDTO toMerge) {
+        target.setFreeSpaceBytes(target.getFreeSpaceBytes() + toMerge.getFreeSpaceBytes());
+        target.setTotalSpaceBytes(target.getTotalSpaceBytes() + toMerge.getTotalSpaceBytes());
+        target.setUsedSpaceBytes(target.getUsedSpaceBytes() + toMerge.getUsedSpaceBytes());
+        updatePrettyPrintedFields(target);
+    }
+
+    public static void updatePrettyPrintedFields(final StorageUsageDTO target) {
+        target.setFreeSpace(FormatUtils.formatDataSize(target.getFreeSpaceBytes()));
+        target.setTotalSpace(FormatUtils.formatDataSize(target.getTotalSpaceBytes()));
+        target.setUsedSpace(FormatUtils.formatDataSize(target.getUsedSpaceBytes()));
+
+        if (target.getTotalSpaceBytes() != -1) {
+            target.setUtilization(FormatUtils.formatUtilization(getUtilization(target.getUsedSpaceBytes(), target.getTotalSpaceBytes())));
+        }
+    }
+
+
+    public static void mergeGarbageCollection(final Set<GarbageCollectionDTO> targetSet, final Set<GarbageCollectionDTO> toMerge) {
+        final Map<String, GarbageCollectionDTO> storageById = new HashMap<>();
+        for (final GarbageCollectionDTO targetUsage : targetSet) {
+            storageById.put(targetUsage.getName(), targetUsage);
+        }
+
+        for (final GarbageCollectionDTO usageToMerge : toMerge) {
+            final GarbageCollectionDTO targetUsage = storageById.get(usageToMerge.getName());
+            if (targetUsage == null) {
+                storageById.put(usageToMerge.getName(), usageToMerge);
+            } else {
+                merge(targetUsage, usageToMerge);
+            }
+        }
+
+        targetSet.clear();
+        targetSet.addAll(storageById.values());
+    }
+
+    public static void merge(final GarbageCollectionDTO target, final GarbageCollectionDTO toMerge) {
+        target.setCollectionCount(target.getCollectionCount() + toMerge.getCollectionCount());
+        target.setCollectionMillis(target.getCollectionMillis() + toMerge.getCollectionMillis());
+        updatePrettyPrintedFields(target);
+    }
+
+    public static void updatePrettyPrintedFields(final GarbageCollectionDTO target) {
+        target.setCollectionTime(FormatUtils.formatHoursMinutesSeconds(target.getCollectionMillis(), TimeUnit.MILLISECONDS));
+    }
+
+    public static void merge(final CountersDTO target, final CountersDTO toMerge, final String nodeId, final String nodeAddress, final Integer nodeApiPort) {
+        merge(target.getAggregateSnapshot(), toMerge.getAggregateSnapshot());
+
+        List<NodeCountersSnapshotDTO> nodeSnapshots = target.getNodeSnapshots();
+        if (nodeSnapshots == null) {
+            nodeSnapshots = new ArrayList<>();
+        }
+
+        final NodeCountersSnapshotDTO nodeCountersSnapshot = new NodeCountersSnapshotDTO();
+        nodeCountersSnapshot.setNodeId(nodeId);
+        nodeCountersSnapshot.setAddress(nodeAddress);
+        nodeCountersSnapshot.setApiPort(nodeApiPort);
+        nodeCountersSnapshot.setSnapshot(toMerge.getAggregateSnapshot());
+
+        nodeSnapshots.add(nodeCountersSnapshot);
+
+        target.setNodeSnapshots(nodeSnapshots);
+    }
+
+    public static void merge(final CountersSnapshotDTO target, final CountersSnapshotDTO toMerge) {
+        final Map<String, CounterDTO> counters = new HashMap<>();
+
+        for (final CounterDTO counter : target.getCounters()) {
+            counters.put(counter.getId(), counter);
+        }
+
+        for (final CounterDTO counter : toMerge.getCounters()) {
+            final CounterDTO existing = counters.get(counter.getId());
+            if (existing == null) {
+                counters.put(counter.getId(), counter);
+            } else {
+                merge(existing, counter);
+            }
+        }
+
+        target.setCounters(counters.values());
+    }
+
+    public static void merge(final CounterDTO target, final CounterDTO toMerge) {
+        target.setValueCount(target.getValueCount() + toMerge.getValueCount());
+        target.setValue(FormatUtils.formatCount(target.getValueCount()));
+    }
+
+
+    public static int getUtilization(final double used, final double total) {
+        return (int) Math.round((used / total) * 100);
+    }
+
+    public static String formatCount(final Integer intStatus) {
+        return intStatus == null ? "-" : FormatUtils.formatCount(intStatus);
+    }
+
+    public static String formatDataSize(final Long longStatus) {
+        return longStatus == null ? "-" : FormatUtils.formatDataSize(longStatus);
+    }
+
+    public static String prettyPrint(final Integer count, final Long bytes) {
+        return formatCount(count) + " / " + formatDataSize(bytes);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/0d3bd2c4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/ClusteredEventAccess.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/ClusteredEventAccess.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/ClusteredEventAccess.java
index d3d5559..3966a31 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/ClusteredEventAccess.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/ClusteredEventAccess.java
@@ -46,7 +46,7 @@ public class ClusteredEventAccess implements EventAccess {
 
     @Override
     public ProcessGroupStatus getControllerStatus() {
-        return clusterManager.getProcessGroupStatus(WebClusterManager.ROOT_GROUP_ID_ALIAS);
+        return new ProcessGroupStatus();
     }
 
     @Override