You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by mc...@apache.org on 2016/04/04 18:28:48 UTC
[12/18] nifi git commit: NIFI-1563: - Federate requests and merge
responses from nodes instead of storing bulletins and stats at NCM - Updating
UI to support restructured status history DTO. - Return 'Insufficient
History' message if aggregate stats don'
http://git-wip-us.apache.org/repos/asf/nifi/blob/0d3bd2c4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/ConnectionStatusEntity.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/ConnectionStatusEntity.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/ConnectionStatusEntity.java
new file mode 100644
index 0000000..8b62331
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/ConnectionStatusEntity.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.web.api.entity;
+
+import org.apache.nifi.web.api.dto.status.ConnectionStatusDTO;
+
+import javax.xml.bind.annotation.XmlRootElement;
+
+/**
+ * A serialized representation of this class can be placed in the entity body of a request or response to or from the API. This particular entity holds a reference to a ConnectionStatusDTO.
+ */
+@XmlRootElement(name = "connectionStatusEntity")
+public class ConnectionStatusEntity extends Entity {
+
+ private ConnectionStatusDTO connectionStatus;
+
+ /**
+ * The ConnectionStatusDTO that is being serialized.
+ *
+ * @return The ConnectionStatusDTO object
+ */
+ public ConnectionStatusDTO getConnectionStatus() {
+ return connectionStatus;
+ }
+
+ public void setConnectionStatus(ConnectionStatusDTO connectionStatus) {
+ this.connectionStatus = connectionStatus;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/0d3bd2c4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/NodeSystemDiagnosticsEntity.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/NodeSystemDiagnosticsEntity.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/NodeSystemDiagnosticsEntity.java
deleted file mode 100644
index 443276c..0000000
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/NodeSystemDiagnosticsEntity.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.web.api.entity;
-
-import javax.xml.bind.annotation.XmlRootElement;
-import org.apache.nifi.web.api.dto.NodeSystemDiagnosticsDTO;
-
-/**
- * A serialized representation of this class can be placed in the entity body of a request or response to or from the API. This particular entity holds a reference to a NodeSystemDiagnosticsDTO.
- */
-@XmlRootElement(name = "nodeSystemDiagnosticsEntity")
-public class NodeSystemDiagnosticsEntity extends Entity {
-
- private NodeSystemDiagnosticsDTO nodeSystemDiagnostics;
-
- /**
- * The NodeSystemDiagnosticsDTO that is being serialized.
- *
- * @return The NodeSystemDiagnosticsDTO object
- */
- public NodeSystemDiagnosticsDTO getNodeSystemDiagnostics() {
- return nodeSystemDiagnostics;
- }
-
- public void setNodeSystemDiagnostics(NodeSystemDiagnosticsDTO nodeSystemDiagnostics) {
- this.nodeSystemDiagnostics = nodeSystemDiagnostics;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/nifi/blob/0d3bd2c4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/PortStatusEntity.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/PortStatusEntity.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/PortStatusEntity.java
new file mode 100644
index 0000000..e0b49c4
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/PortStatusEntity.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.web.api.entity;
+
+import org.apache.nifi.web.api.dto.status.PortStatusDTO;
+
+import javax.xml.bind.annotation.XmlRootElement;
+
+/**
+ * A serialized representation of this class can be placed in the entity body of a request or response to or from the API. This particular entity holds a reference to a PortStatusDTO.
+ */
+@XmlRootElement(name = "portStatusEntity")
+public class PortStatusEntity extends Entity {
+
+ private PortStatusDTO portStatus;
+
+ /**
+ * The PortStatusDTO that is being serialized.
+ *
+ * @return The PortStatusDTO object
+ */
+ public PortStatusDTO getPortStatus() {
+ return portStatus;
+ }
+
+ public void setPortStatus(PortStatusDTO portStatus) {
+ this.portStatus = portStatus;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/0d3bd2c4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/ProcessorStatusEntity.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/ProcessorStatusEntity.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/ProcessorStatusEntity.java
new file mode 100644
index 0000000..0c2170c
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/ProcessorStatusEntity.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.web.api.entity;
+
+import org.apache.nifi.web.api.dto.status.ProcessorStatusDTO;
+
+import javax.xml.bind.annotation.XmlRootElement;
+
+/**
+ * A serialized representation of this class can be placed in the entity body of a request or response to or from the API. This particular entity holds a reference to a ProcessorStatusDTO.
+ */
+@XmlRootElement(name = "processorStatusEntity")
+public class ProcessorStatusEntity extends Entity {
+
+ private ProcessorStatusDTO processorStatus;
+
+ /**
+ * The ProcessorStatusDTO that is being serialized.
+ *
+ * @return The ProcessorStatusDTO object
+ */
+ public ProcessorStatusDTO getProcessorStatus() {
+ return processorStatus;
+ }
+
+ public void setProcessorStatus(ProcessorStatusDTO processorStatus) {
+ this.processorStatus = processorStatus;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/0d3bd2c4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/RemoteProcessGroupStatusEntity.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/RemoteProcessGroupStatusEntity.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/RemoteProcessGroupStatusEntity.java
new file mode 100644
index 0000000..a5031ab
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/RemoteProcessGroupStatusEntity.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.web.api.entity;
+
+import org.apache.nifi.web.api.dto.status.RemoteProcessGroupStatusDTO;
+
+import javax.xml.bind.annotation.XmlRootElement;
+
+/**
+ * A serialized representation of this class can be placed in the entity body of a request or response to or from the API. This particular entity holds a reference to a RemoteProcessGroupStatusDTO.
+ */
+@XmlRootElement(name = "remoteProcessGroupStatusEntity")
+public class RemoteProcessGroupStatusEntity extends Entity {
+
+ private RemoteProcessGroupStatusDTO remoteProcessGroupStatus;
+
+ /**
+ * The RemoteProcessGroupStatusDTO that is being serialized.
+ *
+ * @return The RemoteProcessGroupStatusDTO object
+ */
+ public RemoteProcessGroupStatusDTO getRemoteProcessGroupStatus() {
+ return remoteProcessGroupStatus;
+ }
+
+ public void setRemoteProcessGroupStatus(RemoteProcessGroupStatusDTO remoteProcessGroupStatus) {
+ this.remoteProcessGroupStatus = remoteProcessGroupStatus;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/0d3bd2c4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/NodeProtocolSender.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/NodeProtocolSender.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/NodeProtocolSender.java
index f3e5df4..be0c339 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/NodeProtocolSender.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/NodeProtocolSender.java
@@ -20,7 +20,6 @@ import org.apache.nifi.cluster.protocol.message.ConnectionRequestMessage;
import org.apache.nifi.cluster.protocol.message.ConnectionResponseMessage;
import org.apache.nifi.cluster.protocol.message.ControllerStartupFailureMessage;
import org.apache.nifi.cluster.protocol.message.HeartbeatMessage;
-import org.apache.nifi.cluster.protocol.message.NodeBulletinsMessage;
import org.apache.nifi.cluster.protocol.message.ReconnectionFailureMessage;
/**
@@ -52,15 +51,6 @@ public interface NodeProtocolSender {
void heartbeat(HeartbeatMessage msg) throws ProtocolException, UnknownServiceAddressException;
/**
- * Sends a bulletins message to the cluster manager.
- *
- * @param msg a message
- * @throws ProtocolException pe
- * @throws UnknownServiceAddressException ex
- */
- void sendBulletins(NodeBulletinsMessage msg) throws ProtocolException, UnknownServiceAddressException;
-
- /**
* Sends a failure notification if the controller was unable start.
*
* @param msg a message
http://git-wip-us.apache.org/repos/asf/nifi/blob/0d3bd2c4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/NodeProtocolSenderImpl.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/NodeProtocolSenderImpl.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/NodeProtocolSenderImpl.java
index 993dea5..9ae6182 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/NodeProtocolSenderImpl.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/NodeProtocolSenderImpl.java
@@ -32,7 +32,6 @@ import org.apache.nifi.cluster.protocol.message.ConnectionRequestMessage;
import org.apache.nifi.cluster.protocol.message.ConnectionResponseMessage;
import org.apache.nifi.cluster.protocol.message.ControllerStartupFailureMessage;
import org.apache.nifi.cluster.protocol.message.HeartbeatMessage;
-import org.apache.nifi.cluster.protocol.message.NodeBulletinsMessage;
import org.apache.nifi.cluster.protocol.message.ProtocolMessage;
import org.apache.nifi.cluster.protocol.message.ProtocolMessage.MessageType;
import org.apache.nifi.cluster.protocol.message.ReconnectionFailureMessage;
@@ -117,11 +116,6 @@ public class NodeProtocolSenderImpl implements NodeProtocolSender {
}
@Override
- public void sendBulletins(NodeBulletinsMessage msg) throws ProtocolException, UnknownServiceAddressException {
- sendProtocolMessage(msg);
- }
-
- @Override
public void notifyControllerStartupFailure(final ControllerStartupFailureMessage msg) throws ProtocolException, UnknownServiceAddressException {
sendProtocolMessage(msg);
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/0d3bd2c4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/NodeProtocolSenderListener.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/NodeProtocolSenderListener.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/NodeProtocolSenderListener.java
index 2992e38..0a9a064 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/NodeProtocolSenderListener.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/NodeProtocolSenderListener.java
@@ -24,7 +24,6 @@ import org.apache.nifi.cluster.protocol.ProtocolException;
import org.apache.nifi.cluster.protocol.ProtocolHandler;
import org.apache.nifi.cluster.protocol.ProtocolListener;
import org.apache.nifi.cluster.protocol.UnknownServiceAddressException;
-import org.apache.nifi.cluster.protocol.message.NodeBulletinsMessage;
import org.apache.nifi.cluster.protocol.message.ConnectionRequestMessage;
import org.apache.nifi.cluster.protocol.message.ConnectionResponseMessage;
import org.apache.nifi.cluster.protocol.message.ControllerStartupFailureMessage;
@@ -104,11 +103,6 @@ public class NodeProtocolSenderListener implements NodeProtocolSender, ProtocolL
}
@Override
- public void sendBulletins(NodeBulletinsMessage msg) throws ProtocolException, UnknownServiceAddressException {
- sender.sendBulletins(msg);
- }
-
- @Override
public void setBulletinRepository(final BulletinRepository bulletinRepository) {
listener.setBulletinRepository(bulletinRepository);
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/0d3bd2c4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/ObjectFactory.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/ObjectFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/ObjectFactory.java
index f0a9fa7..516b67e 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/ObjectFactory.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/ObjectFactory.java
@@ -26,7 +26,6 @@ import org.apache.nifi.cluster.protocol.message.FlowRequestMessage;
import org.apache.nifi.cluster.protocol.message.FlowResponseMessage;
import org.apache.nifi.cluster.protocol.message.HeartbeatMessage;
import org.apache.nifi.cluster.protocol.message.MulticastProtocolMessage;
-import org.apache.nifi.cluster.protocol.message.NodeBulletinsMessage;
import org.apache.nifi.cluster.protocol.message.PingMessage;
import org.apache.nifi.cluster.protocol.message.PrimaryRoleAssignmentMessage;
import org.apache.nifi.cluster.protocol.message.ReconnectionFailureMessage;
@@ -97,8 +96,4 @@ public class ObjectFactory {
public PrimaryRoleAssignmentMessage createPrimaryRoleAssignmentMessage() {
return new PrimaryRoleAssignmentMessage();
}
-
- public NodeBulletinsMessage createBulletinsMessage() {
- return new NodeBulletinsMessage();
- }
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/0d3bd2c4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/NodeBulletinsMessage.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/NodeBulletinsMessage.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/NodeBulletinsMessage.java
deleted file mode 100644
index 6df3ba4..0000000
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/NodeBulletinsMessage.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.cluster.protocol.message;
-
-import org.apache.nifi.cluster.protocol.NodeBulletins;
-import javax.xml.bind.annotation.XmlRootElement;
-
-/**
- */
-@XmlRootElement(name = "nodeBulletinsMessage")
-public class NodeBulletinsMessage extends ProtocolMessage {
-
- private NodeBulletins bulletins;
-
- @Override
- public MessageType getType() {
- return MessageType.BULLETINS;
- }
-
- public NodeBulletins getBulletins() {
- return bulletins;
- }
-
- public void setBulletins(NodeBulletins bulletins) {
- this.bulletins = bulletins;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/nifi/blob/0d3bd2c4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ProtocolMessage.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ProtocolMessage.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ProtocolMessage.java
index c6f7ce0..f01efd8 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ProtocolMessage.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ProtocolMessage.java
@@ -21,8 +21,6 @@ public abstract class ProtocolMessage {
private volatile String requestorDN;
public static enum MessageType {
-
- BULLETINS,
CONNECTION_REQUEST,
CONNECTION_RESPONSE,
CONTROLLER_STARTUP_FAILURE,
http://git-wip-us.apache.org/repos/asf/nifi/blob/0d3bd2c4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ClusterManager.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ClusterManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ClusterManager.java
index 336d675..51de54b 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ClusterManager.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ClusterManager.java
@@ -33,8 +33,6 @@ import org.apache.nifi.cluster.protocol.ConnectionRequest;
import org.apache.nifi.cluster.protocol.ConnectionResponse;
import org.apache.nifi.cluster.protocol.Heartbeat;
import org.apache.nifi.cluster.protocol.NodeIdentifier;
-import org.apache.nifi.controller.status.ProcessGroupStatus;
-import org.apache.nifi.diagnostics.SystemDiagnostics;
import org.apache.nifi.remote.cluster.NodeInformant;
import org.apache.nifi.reporting.BulletinRepository;
@@ -168,15 +166,4 @@ public interface ClusterManager extends NodeInformant {
* @return the bulletin repository
*/
BulletinRepository getBulletinRepository();
-
- /**
- * @param groupId groupId
- * @return a {@link ProcessGroupStatus} that represents the status of all nodes with the given {@link Status}es for the given ProcessGroup id, or null if no nodes exist with the given statuses
- */
- ProcessGroupStatus getProcessGroupStatus(String groupId);
-
- /**
- * @return a merged representation of the System Diagnostics for all nodes in the cluster
- */
- SystemDiagnostics getSystemDiagnostics();
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/0d3bd2c4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/StatusMerger.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/StatusMerger.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/StatusMerger.java
new file mode 100644
index 0000000..66ad494
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/StatusMerger.java
@@ -0,0 +1,646 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.cluster.manager;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.nifi.controller.status.RunStatus;
+import org.apache.nifi.controller.status.TransmissionStatus;
+import org.apache.nifi.util.FormatUtils;
+import org.apache.nifi.web.api.dto.BulletinDTO;
+import org.apache.nifi.web.api.dto.CounterDTO;
+import org.apache.nifi.web.api.dto.CountersDTO;
+import org.apache.nifi.web.api.dto.CountersSnapshotDTO;
+import org.apache.nifi.web.api.dto.NodeCountersSnapshotDTO;
+import org.apache.nifi.web.api.dto.NodeSystemDiagnosticsSnapshotDTO;
+import org.apache.nifi.web.api.dto.SystemDiagnosticsDTO;
+import org.apache.nifi.web.api.dto.SystemDiagnosticsSnapshotDTO;
+import org.apache.nifi.web.api.dto.SystemDiagnosticsSnapshotDTO.GarbageCollectionDTO;
+import org.apache.nifi.web.api.dto.SystemDiagnosticsSnapshotDTO.StorageUsageDTO;
+import org.apache.nifi.web.api.dto.status.ConnectionStatusDTO;
+import org.apache.nifi.web.api.dto.status.ConnectionStatusSnapshotDTO;
+import org.apache.nifi.web.api.dto.status.ControllerStatusDTO;
+import org.apache.nifi.web.api.dto.status.NodeConnectionStatusSnapshotDTO;
+import org.apache.nifi.web.api.dto.status.NodePortStatusSnapshotDTO;
+import org.apache.nifi.web.api.dto.status.NodeProcessGroupStatusSnapshotDTO;
+import org.apache.nifi.web.api.dto.status.NodeProcessorStatusSnapshotDTO;
+import org.apache.nifi.web.api.dto.status.NodeRemoteProcessGroupStatusSnapshotDTO;
+import org.apache.nifi.web.api.dto.status.PortStatusDTO;
+import org.apache.nifi.web.api.dto.status.PortStatusSnapshotDTO;
+import org.apache.nifi.web.api.dto.status.ProcessGroupStatusDTO;
+import org.apache.nifi.web.api.dto.status.ProcessGroupStatusSnapshotDTO;
+import org.apache.nifi.web.api.dto.status.ProcessorStatusDTO;
+import org.apache.nifi.web.api.dto.status.ProcessorStatusSnapshotDTO;
+import org.apache.nifi.web.api.dto.status.RemoteProcessGroupStatusDTO;
+import org.apache.nifi.web.api.dto.status.RemoteProcessGroupStatusSnapshotDTO;
+
+public class StatusMerger {
+ public static void merge(final ControllerStatusDTO target, final ControllerStatusDTO toMerge) {
+ if (target == null || toMerge == null) {
+ return;
+ }
+
+ target.setActiveRemotePortCount(target.getActiveRemotePortCount() + toMerge.getActiveRemotePortCount());
+ target.setActiveThreadCount(target.getActiveThreadCount() + toMerge.getActiveThreadCount());
+ target.setBytesQueued(target.getBytesQueued() + toMerge.getBytesQueued());
+ target.setDisabledCount(target.getDisabledCount() + toMerge.getDisabledCount());
+ target.setFlowFilesQueued(target.getFlowFilesQueued() + toMerge.getFlowFilesQueued());
+ target.setInactiveRemotePortCount(target.getInactiveRemotePortCount() + toMerge.getInactiveRemotePortCount());
+ target.setInvalidCount(target.getInvalidCount() + toMerge.getInvalidCount());
+ target.setRunningCount(target.getRunningCount() + toMerge.getRunningCount());
+ target.setStoppedCount(target.getStoppedCount() + toMerge.getStoppedCount());
+
+ target.setBulletins(mergeBulletins(target.getBulletins(), toMerge.getBulletins()));
+ target.setControllerServiceBulletins(mergeBulletins(target.getControllerServiceBulletins(), toMerge.getControllerServiceBulletins()));
+ target.setReportingTaskBulletins(mergeBulletins(target.getReportingTaskBulletins(), toMerge.getReportingTaskBulletins()));
+
+ updatePrettyPrintedFields(target);
+ }
+
+ public static void updatePrettyPrintedFields(final ControllerStatusDTO target) {
+ target.setQueued(prettyPrint(target.getFlowFilesQueued(), target.getBytesQueued()));
+ target.setConnectedNodes(formatCount(target.getConnectedNodeCount()) + " / " + formatCount(target.getTotalNodeCount()));
+ }
+
+ public static List<BulletinDTO> mergeBulletins(final List<BulletinDTO> targetBulletins, final List<BulletinDTO> toMerge) {
+ final List<BulletinDTO> bulletins = new ArrayList<>();
+ if (targetBulletins != null) {
+ bulletins.addAll(targetBulletins);
+ }
+
+ if (toMerge != null) {
+ bulletins.addAll(toMerge);
+ }
+
+ return bulletins;
+ }
+
+
+ public static void merge(final ProcessGroupStatusDTO target, final ProcessGroupStatusDTO toMerge, final String nodeId, final String nodeAddress, final Integer nodeApiPort) {
+ merge(target.getAggregateSnapshot(), toMerge.getAggregateSnapshot());
+
+ if (target.getNodeSnapshots() != null) {
+ final NodeProcessGroupStatusSnapshotDTO nodeSnapshot = new NodeProcessGroupStatusSnapshotDTO();
+ nodeSnapshot.setStatusSnapshot(toMerge.getAggregateSnapshot());
+ nodeSnapshot.setAddress(nodeAddress);
+ nodeSnapshot.setApiPort(nodeApiPort);
+ nodeSnapshot.setNodeId(nodeId);
+
+ target.getNodeSnapshots().add(nodeSnapshot);
+ }
+ }
+
+ public static void merge(final ProcessGroupStatusSnapshotDTO target, final ProcessGroupStatusSnapshotDTO toMerge) {
+ if (target == null || toMerge == null) {
+ return;
+ }
+
+ target.setBytesIn(target.getBytesIn() + toMerge.getBytesIn());
+ target.setFlowFilesIn(target.getFlowFilesIn() + toMerge.getFlowFilesIn());
+
+ target.setBytesQueued(target.getBytesQueued() + toMerge.getBytesQueued());
+ target.setFlowFilesQueued(target.getFlowFilesQueued() + toMerge.getFlowFilesQueued());
+
+ target.setBytesRead(target.getBytesRead() + toMerge.getBytesRead());
+ target.setBytesWritten(target.getBytesWritten() + toMerge.getBytesWritten());
+
+ target.setBytesOut(target.getBytesOut() + toMerge.getBytesOut());
+ target.setFlowFilesOut(target.getFlowFilesOut() + toMerge.getFlowFilesOut());
+
+ target.setBytesTransferred(target.getBytesTransferred() + toMerge.getBytesTransferred());
+ target.setFlowFilesTransferred(target.getFlowFilesTransferred() + toMerge.getFlowFilesTransferred());
+
+ target.setBytesReceived(target.getBytesReceived() + toMerge.getBytesReceived());
+ target.setFlowFilesReceived(target.getFlowFilesReceived() + toMerge.getFlowFilesReceived());
+
+ target.setBytesSent(target.getBytesSent() + toMerge.getBytesSent());
+ target.setFlowFilesSent(target.getFlowFilesSent() + toMerge.getFlowFilesSent());
+
+ target.setActiveThreadCount(target.getActiveThreadCount() + toMerge.getActiveThreadCount());
+ updatePrettyPrintedFields(target);
+
+ // connection status
+ // sort by id
+ final Map<String, ConnectionStatusSnapshotDTO> mergedConnectionMap = new HashMap<>();
+ for (final ConnectionStatusSnapshotDTO status : replaceNull(target.getConnectionStatusSnapshots())) {
+ mergedConnectionMap.put(status.getId(), status);
+ }
+
+ for (final ConnectionStatusSnapshotDTO statusToMerge : replaceNull(toMerge.getConnectionStatusSnapshots())) {
+ ConnectionStatusSnapshotDTO merged = mergedConnectionMap.get(statusToMerge.getId());
+ if (merged == null) {
+ mergedConnectionMap.put(statusToMerge.getId(), statusToMerge.clone());
+ continue;
+ }
+
+ merge(merged, statusToMerge);
+ }
+ target.setConnectionStatusSnapshots(mergedConnectionMap.values());
+
+ // processor status
+ final Map<String, ProcessorStatusSnapshotDTO> mergedProcessorMap = new HashMap<>();
+ for (final ProcessorStatusSnapshotDTO status : replaceNull(target.getProcessorStatusSnapshots())) {
+ mergedProcessorMap.put(status.getId(), status);
+ }
+
+ for (final ProcessorStatusSnapshotDTO statusToMerge : replaceNull(toMerge.getProcessorStatusSnapshots())) {
+ ProcessorStatusSnapshotDTO merged = mergedProcessorMap.get(statusToMerge.getId());
+ if (merged == null) {
+ mergedProcessorMap.put(statusToMerge.getId(), statusToMerge.clone());
+ continue;
+ }
+
+ merge(merged, statusToMerge);
+ }
+ target.setProcessorStatusSnapshots(mergedProcessorMap.values());
+
+
+ // input ports
+ final Map<String, PortStatusSnapshotDTO> mergedInputPortMap = new HashMap<>();
+ for (final PortStatusSnapshotDTO status : replaceNull(target.getInputPortStatusSnapshots())) {
+ mergedInputPortMap.put(status.getId(), status);
+ }
+
+ for (final PortStatusSnapshotDTO statusToMerge : replaceNull(toMerge.getInputPortStatusSnapshots())) {
+ PortStatusSnapshotDTO merged = mergedInputPortMap.get(statusToMerge.getId());
+ if (merged == null) {
+ mergedInputPortMap.put(statusToMerge.getId(), statusToMerge.clone());
+ continue;
+ }
+
+ merge(merged, statusToMerge);
+ }
+ target.setInputPortStatusSnapshots(mergedInputPortMap.values());
+
+ // output ports
+ final Map<String, PortStatusSnapshotDTO> mergedOutputPortMap = new HashMap<>();
+ for (final PortStatusSnapshotDTO status : replaceNull(target.getOutputPortStatusSnapshots())) {
+ mergedOutputPortMap.put(status.getId(), status);
+ }
+
+ for (final PortStatusSnapshotDTO statusToMerge : replaceNull(toMerge.getOutputPortStatusSnapshots())) {
+ PortStatusSnapshotDTO merged = mergedOutputPortMap.get(statusToMerge.getId());
+ if (merged == null) {
+ mergedOutputPortMap.put(statusToMerge.getId(), statusToMerge.clone());
+ continue;
+ }
+
+ merge(merged, statusToMerge);
+ }
+ target.setOutputPortStatusSnapshots(mergedOutputPortMap.values());
+
+ // child groups
+ final Map<String, ProcessGroupStatusSnapshotDTO> mergedGroupMap = new HashMap<>();
+ for (final ProcessGroupStatusSnapshotDTO status : replaceNull(target.getProcessGroupStatusSnapshots())) {
+ mergedGroupMap.put(status.getId(), status);
+ }
+
+ for (final ProcessGroupStatusSnapshotDTO statusToMerge : replaceNull(toMerge.getProcessGroupStatusSnapshots())) {
+ ProcessGroupStatusSnapshotDTO merged = mergedGroupMap.get(statusToMerge.getId());
+ if (merged == null) {
+ mergedGroupMap.put(statusToMerge.getId(), statusToMerge.clone());
+ continue;
+ }
+
+ merge(merged, statusToMerge);
+ }
+ target.setOutputPortStatusSnapshots(mergedOutputPortMap.values());
+
+ // remote groups
+ final Map<String, RemoteProcessGroupStatusSnapshotDTO> mergedRemoteGroupMap = new HashMap<>();
+ for (final RemoteProcessGroupStatusSnapshotDTO status : replaceNull(target.getRemoteProcessGroupStatusSnapshots())) {
+ mergedRemoteGroupMap.put(status.getId(), status);
+ }
+
+ for (final RemoteProcessGroupStatusSnapshotDTO statusToMerge : replaceNull(toMerge.getRemoteProcessGroupStatusSnapshots())) {
+ RemoteProcessGroupStatusSnapshotDTO merged = mergedRemoteGroupMap.get(statusToMerge.getId());
+ if (merged == null) {
+ mergedRemoteGroupMap.put(statusToMerge.getId(), statusToMerge.clone());
+ continue;
+ }
+
+ merge(merged, statusToMerge);
+ }
+ target.setRemoteProcessGroupStatusSnapshots(mergedRemoteGroupMap.values());
+ }
+
+ private static <T> Collection<T> replaceNull(final Collection<T> collection) {
+ return (collection == null) ? Collections.<T> emptyList() : collection;
+ }
+
+
+ /**
+ * Updates the fields that are "pretty printed" based on the raw values currently set. For example,
+ * {@link ProcessGroupStatusSnapshotDTO#setInput(String)} will be called with the pretty-printed form of the
+ * FlowFile counts and sizes retrieved via {@link ProcessGroupStatusSnapshotDTO#getFlowFilesIn()} and
+ * {@link ProcessGroupStatusSnapshotDTO#getBytesIn()}.
+ *
+ * This logic is performed here, rather than in the DTO itself because the DTO needs to be kept purely
+ * getters & setters - otherwise the automatic marshalling and unmarshalling to/from JSON becomes very
+ * complicated.
+ *
+ * @param target the DTO to update
+ */
+ public static void updatePrettyPrintedFields(final ProcessGroupStatusSnapshotDTO target) {
+ target.setQueued(prettyPrint(target.getFlowFilesQueued(), target.getBytesQueued()));
+ target.setQueuedCount(formatCount(target.getFlowFilesQueued()));
+ target.setQueuedSize(formatDataSize(target.getBytesQueued()));
+ target.setInput(prettyPrint(target.getFlowFilesIn(), target.getBytesIn()));
+ target.setRead(formatDataSize(target.getBytesRead()));
+ target.setWritten(formatDataSize(target.getBytesWritten()));
+ target.setOutput(prettyPrint(target.getFlowFilesOut(), target.getBytesOut()));
+ target.setTransferred(prettyPrint(target.getFlowFilesTransferred(), target.getBytesTransferred()));
+ target.setReceived(prettyPrint(target.getFlowFilesReceived(), target.getBytesReceived()));
+ target.setSent(prettyPrint(target.getFlowFilesSent(), target.getBytesSent()));
+ }
+
+ public static void merge(final RemoteProcessGroupStatusDTO target, final RemoteProcessGroupStatusDTO toMerge, final String nodeId, final String nodeAddress, final Integer nodeApiPort) {
+ merge(target.getAggregateSnapshot(), toMerge.getAggregateSnapshot());
+
+ if (target.getNodeSnapshots() != null) {
+ final NodeRemoteProcessGroupStatusSnapshotDTO nodeSnapshot = new NodeRemoteProcessGroupStatusSnapshotDTO();
+ nodeSnapshot.setStatusSnapshot(toMerge.getAggregateSnapshot());
+ nodeSnapshot.setAddress(nodeAddress);
+ nodeSnapshot.setApiPort(nodeApiPort);
+ nodeSnapshot.setNodeId(nodeId);
+
+ target.getNodeSnapshots().add(nodeSnapshot);
+ }
+ }
+
+ public static void merge(final PortStatusDTO target, final PortStatusDTO toMerge, final String nodeId, final String nodeAddress, final Integer nodeApiPort) {
+ merge(target.getAggregateSnapshot(), toMerge.getAggregateSnapshot());
+
+ if (target.getNodeSnapshots() != null) {
+ final NodePortStatusSnapshotDTO nodeSnapshot = new NodePortStatusSnapshotDTO();
+ nodeSnapshot.setStatusSnapshot(toMerge.getAggregateSnapshot());
+ nodeSnapshot.setAddress(nodeAddress);
+ nodeSnapshot.setApiPort(nodeApiPort);
+ nodeSnapshot.setNodeId(nodeId);
+
+ target.getNodeSnapshots().add(nodeSnapshot);
+ }
+ }
+
+ public static void merge(final ConnectionStatusDTO target, final ConnectionStatusDTO toMerge, final String nodeId, final String nodeAddress, final Integer nodeApiPort) {
+ merge(target.getAggregateSnapshot(), toMerge.getAggregateSnapshot());
+
+ if (target.getNodeSnapshots() != null) {
+ final NodeConnectionStatusSnapshotDTO nodeSnapshot = new NodeConnectionStatusSnapshotDTO();
+ nodeSnapshot.setStatusSnapshot(toMerge.getAggregateSnapshot());
+ nodeSnapshot.setAddress(nodeAddress);
+ nodeSnapshot.setApiPort(nodeApiPort);
+ nodeSnapshot.setNodeId(nodeId);
+
+ target.getNodeSnapshots().add(nodeSnapshot);
+ }
+ }
+
+ public static void merge(final ProcessorStatusDTO target, final ProcessorStatusDTO toMerge, final String nodeId, final String nodeAddress, final Integer nodeApiPort) {
+ merge(target.getAggregateSnapshot(), toMerge.getAggregateSnapshot());
+
+ if (target.getNodeSnapshots() != null) {
+ final NodeProcessorStatusSnapshotDTO nodeSnapshot = new NodeProcessorStatusSnapshotDTO();
+ nodeSnapshot.setStatusSnapshot(toMerge.getAggregateSnapshot());
+ nodeSnapshot.setAddress(nodeAddress);
+ nodeSnapshot.setApiPort(nodeApiPort);
+ nodeSnapshot.setNodeId(nodeId);
+
+ target.getNodeSnapshots().add(nodeSnapshot);
+ }
+ }
+
+ public static void merge(final ProcessorStatusSnapshotDTO target, final ProcessorStatusSnapshotDTO toMerge) {
+ if (target == null || toMerge == null) {
+ return;
+ }
+
+ // if the status to merge is invalid allow it to take precedence. whether the
+ // processor run status is disabled/stopped/running is part of the flow configuration
+ // and should not differ amongst nodes. however, whether a processor is invalid
+ // can be driven by environmental conditions. this check allows any of those to
+ // take precedence over the configured run status.
+ if (RunStatus.Invalid.name().equals(toMerge.getRunStatus())) {
+ target.setRunStatus(RunStatus.Invalid.name());
+ }
+
+ target.setBytesRead(target.getBytesRead() + toMerge.getBytesRead());
+ target.setBytesWritten(target.getBytesWritten() + toMerge.getBytesWritten());
+ target.setFlowFilesIn(target.getFlowFilesIn() + toMerge.getFlowFilesIn());
+ target.setBytesIn(target.getBytesIn() + toMerge.getBytesIn());
+ target.setFlowFilesOut(target.getFlowFilesOut() + toMerge.getFlowFilesOut());
+ target.setBytesOut(target.getBytesOut() + toMerge.getBytesOut());
+ target.setTaskCount(target.getTaskCount() + toMerge.getTaskCount());
+ target.setTasksDurationNanos(target.getTasksDurationNanos() + toMerge.getTasksDurationNanos());
+ target.setActiveThreadCount(target.getActiveThreadCount() + toMerge.getActiveThreadCount());
+ updatePrettyPrintedFields(target);
+ }
+
+ public static void updatePrettyPrintedFields(final ProcessorStatusSnapshotDTO target) {
+ target.setInput(prettyPrint(target.getFlowFilesIn(), target.getBytesIn()));
+ target.setRead(formatDataSize(target.getBytesRead()));
+ target.setWritten(formatDataSize(target.getBytesWritten()));
+ target.setOutput(prettyPrint(target.getFlowFilesOut(), target.getBytesOut()));
+
+ final Integer taskCount = target.getTaskCount();
+ final String tasks = (taskCount == null) ? "-" : formatCount(taskCount);
+ target.setTasks(tasks);
+
+ target.setTasksDuration(FormatUtils.formatHoursMinutesSeconds(target.getTasksDurationNanos(), TimeUnit.NANOSECONDS));
+ }
+
+
+ public static void merge(final ConnectionStatusSnapshotDTO target, final ConnectionStatusSnapshotDTO toMerge) {
+ if (target == null || toMerge == null) {
+ return;
+ }
+
+ target.setFlowFilesIn(target.getFlowFilesIn() + toMerge.getFlowFilesIn());
+ target.setBytesIn(target.getBytesIn() + toMerge.getBytesIn());
+ target.setFlowFilesOut(target.getFlowFilesOut() + toMerge.getFlowFilesOut());
+ target.setBytesOut(target.getBytesOut() + toMerge.getBytesOut());
+ target.setFlowFilesQueued(target.getFlowFilesQueued() + toMerge.getFlowFilesQueued());
+ target.setBytesQueued(target.getBytesQueued() + toMerge.getBytesQueued());
+ updatePrettyPrintedFields(target);
+ }
+
+ public static void updatePrettyPrintedFields(final ConnectionStatusSnapshotDTO target) {
+ target.setQueued(prettyPrint(target.getFlowFilesQueued(), target.getBytesQueued()));
+ target.setQueuedCount(formatCount(target.getFlowFilesQueued()));
+ target.setQueuedSize(formatDataSize(target.getBytesQueued()));
+ target.setInput(prettyPrint(target.getFlowFilesIn(), target.getBytesIn()));
+ target.setOutput(prettyPrint(target.getFlowFilesOut(), target.getBytesOut()));
+ }
+
+
+
+ public static void merge(final RemoteProcessGroupStatusSnapshotDTO target, final RemoteProcessGroupStatusSnapshotDTO toMerge) {
+ final String transmittingValue = TransmissionStatus.Transmitting.name();
+ if (transmittingValue.equals(target.getTransmissionStatus()) || transmittingValue.equals(toMerge.getTransmissionStatus())) {
+ target.setTransmissionStatus(transmittingValue);
+ }
+
+ target.setActiveThreadCount(target.getActiveThreadCount() + toMerge.getActiveThreadCount());
+
+ final List<String> authIssues = new ArrayList<>();
+ if (target.getAuthorizationIssues() != null) {
+ authIssues.addAll(target.getAuthorizationIssues());
+ }
+ if (toMerge.getAuthorizationIssues() != null) {
+ authIssues.addAll(toMerge.getAuthorizationIssues());
+ }
+ target.setAuthorizationIssues(authIssues);
+
+ target.setFlowFilesSent(target.getFlowFilesSent() + toMerge.getFlowFilesSent());
+ target.setBytesSent(target.getBytesSent() + toMerge.getBytesSent());
+ target.setFlowFilesReceived(target.getFlowFilesReceived() + toMerge.getFlowFilesReceived());
+ target.setBytesReceived(target.getBytesReceived() + toMerge.getBytesReceived());
+ updatePrettyPrintedFields(target);
+ }
+
+ public static void updatePrettyPrintedFields(final RemoteProcessGroupStatusSnapshotDTO target) {
+ target.setReceived(prettyPrint(target.getFlowFilesReceived(), target.getBytesReceived()));
+ target.setSent(prettyPrint(target.getFlowFilesSent(), target.getBytesSent()));
+ }
+
+
+
+ public static void merge(final PortStatusSnapshotDTO target, final PortStatusSnapshotDTO toMerge) {
+ if (target == null || toMerge == null) {
+ return;
+ }
+
+ target.setActiveThreadCount(target.getActiveThreadCount() + toMerge.getActiveThreadCount());
+ target.setFlowFilesIn(target.getFlowFilesIn() + toMerge.getFlowFilesIn());
+ target.setBytesIn(target.getBytesIn() + toMerge.getBytesIn());
+ target.setFlowFilesOut(target.getFlowFilesOut() + toMerge.getFlowFilesOut());
+ target.setBytesOut(target.getBytesOut() + toMerge.getBytesOut());
+ target.setTransmitting(Boolean.TRUE.equals(target.isTransmitting()) || Boolean.TRUE.equals(toMerge.isTransmitting()));
+
+ // should be unnecessary here since ports run status not should be affected by
+ // environmental conditions but doing so in case that changes
+ if (RunStatus.Invalid.name().equals(toMerge.getRunStatus())) {
+ target.setRunStatus(RunStatus.Invalid.name());
+ }
+
+ updatePrettyPrintedFields(target);
+ }
+
+ public static void updatePrettyPrintedFields(final PortStatusSnapshotDTO target) {
+ target.setInput(prettyPrint(target.getFlowFilesIn(), target.getBytesIn()));
+ target.setOutput(prettyPrint(target.getFlowFilesOut(), target.getBytesOut()));
+ }
+
+
+ public static void merge(final SystemDiagnosticsDTO target, final SystemDiagnosticsDTO toMerge, final String nodeId, final String nodeAddress, final Integer nodeApiPort) {
+ merge(target.getAggregateSnapshot(), toMerge.getAggregateSnapshot());
+
+ List<NodeSystemDiagnosticsSnapshotDTO> nodeSnapshots = target.getNodeSnapshots();
+ if (nodeSnapshots == null) {
+ nodeSnapshots = new ArrayList<>();
+ }
+
+ final NodeSystemDiagnosticsSnapshotDTO nodeSnapshot = new NodeSystemDiagnosticsSnapshotDTO();
+ nodeSnapshot.setAddress(nodeAddress);
+ nodeSnapshot.setApiPort(nodeApiPort);
+ nodeSnapshot.setNodeId(nodeId);
+ nodeSnapshot.setSnapshot(toMerge.getAggregateSnapshot());
+
+ nodeSnapshots.add(nodeSnapshot);
+ target.setNodeSnapshots(nodeSnapshots);
+ }
+
+ public static void merge(final SystemDiagnosticsSnapshotDTO target, final SystemDiagnosticsSnapshotDTO toMerge) {
+ if (target == null || toMerge == null) {
+ return;
+ }
+
+ target.setAvailableProcessors(target.getAvailableProcessors() + toMerge.getAvailableProcessors());
+ target.setDaemonThreads(target.getDaemonThreads() + toMerge.getDaemonThreads());
+ target.setFreeHeapBytes(target.getFreeHeapBytes() + toMerge.getFreeHeapBytes());
+ target.setFreeNonHeapBytes(target.getFreeNonHeapBytes() + toMerge.getFreeNonHeapBytes());
+ target.setMaxHeapBytes(target.getMaxHeapBytes() + toMerge.getMaxHeapBytes());
+ target.setMaxNonHeapBytes(target.getMaxNonHeapBytes() + toMerge.getMaxNonHeapBytes());
+ target.setProcessorLoadAverage(target.getProcessorLoadAverage() + toMerge.getProcessorLoadAverage());
+ target.setTotalHeapBytes(target.getTotalHeapBytes() + toMerge.getTotalHeapBytes());
+ target.setTotalNonHeapBytes(target.getTotalNonHeapBytes() + toMerge.getTotalNonHeapBytes());
+ target.setTotalThreads(target.getTotalThreads() + toMerge.getTotalThreads());
+ target.setUsedHeapBytes(target.getUsedHeapBytes() + toMerge.getUsedHeapBytes());
+ target.setUsedNonHeapBytes(target.getUsedNonHeapBytes() + toMerge.getUsedNonHeapBytes());
+
+ merge(target.getContentRepositoryStorageUsage(), toMerge.getContentRepositoryStorageUsage());
+ merge(target.getFlowFileRepositoryStorageUsage(), toMerge.getFlowFileRepositoryStorageUsage());
+ mergeGarbageCollection(target.getGarbageCollection(), toMerge.getGarbageCollection());
+
+ updatePrettyPrintedFields(target);
+ }
+
+ public static void updatePrettyPrintedFields(final SystemDiagnosticsSnapshotDTO target) {
+ // heap
+ target.setMaxHeap(FormatUtils.formatDataSize(target.getMaxHeapBytes()));
+ target.setTotalHeap(FormatUtils.formatDataSize(target.getTotalHeapBytes()));
+ target.setUsedHeap(FormatUtils.formatDataSize(target.getUsedHeapBytes()));
+ target.setFreeHeap(FormatUtils.formatDataSize(target.getFreeHeapBytes()));
+ if (target.getMaxHeapBytes() != -1) {
+ target.setHeapUtilization(FormatUtils.formatUtilization(getUtilization(target.getUsedHeapBytes(), target.getMaxHeapBytes())));
+ }
+
+ // non heap
+ target.setMaxNonHeap(FormatUtils.formatDataSize(target.getMaxNonHeapBytes()));
+ target.setTotalNonHeap(FormatUtils.formatDataSize(target.getTotalNonHeapBytes()));
+ target.setUsedNonHeap(FormatUtils.formatDataSize(target.getUsedNonHeapBytes()));
+ target.setFreeNonHeap(FormatUtils.formatDataSize(target.getFreeNonHeapBytes()));
+ if (target.getMaxNonHeapBytes() != -1) {
+ target.setNonHeapUtilization(FormatUtils.formatUtilization(getUtilization(target.getUsedNonHeapBytes(), target.getMaxNonHeapBytes())));
+ }
+ }
+
+ public static void merge(final Set<StorageUsageDTO> targetSet, final Set<StorageUsageDTO> toMerge) {
+ final Map<String, StorageUsageDTO> storageById = new HashMap<>();
+ for (final StorageUsageDTO targetUsage : targetSet) {
+ storageById.put(targetUsage.getIdentifier(), targetUsage);
+ }
+
+ for (final StorageUsageDTO usageToMerge : toMerge) {
+ final StorageUsageDTO targetUsage = storageById.get(usageToMerge.getIdentifier());
+ if (targetUsage == null) {
+ storageById.put(usageToMerge.getIdentifier(), usageToMerge);
+ } else {
+ merge(targetUsage, usageToMerge);
+ }
+ }
+
+ targetSet.clear();
+ targetSet.addAll(storageById.values());
+ }
+
+ public static void merge(final StorageUsageDTO target, final StorageUsageDTO toMerge) {
+ target.setFreeSpaceBytes(target.getFreeSpaceBytes() + toMerge.getFreeSpaceBytes());
+ target.setTotalSpaceBytes(target.getTotalSpaceBytes() + toMerge.getTotalSpaceBytes());
+ target.setUsedSpaceBytes(target.getUsedSpaceBytes() + toMerge.getUsedSpaceBytes());
+ updatePrettyPrintedFields(target);
+ }
+
+ public static void updatePrettyPrintedFields(final StorageUsageDTO target) {
+ target.setFreeSpace(FormatUtils.formatDataSize(target.getFreeSpaceBytes()));
+ target.setTotalSpace(FormatUtils.formatDataSize(target.getTotalSpaceBytes()));
+ target.setUsedSpace(FormatUtils.formatDataSize(target.getUsedSpaceBytes()));
+
+ if (target.getTotalSpaceBytes() != -1) {
+ target.setUtilization(FormatUtils.formatUtilization(getUtilization(target.getUsedSpaceBytes(), target.getTotalSpaceBytes())));
+ }
+ }
+
+
+ public static void mergeGarbageCollection(final Set<GarbageCollectionDTO> targetSet, final Set<GarbageCollectionDTO> toMerge) {
+ final Map<String, GarbageCollectionDTO> storageById = new HashMap<>();
+ for (final GarbageCollectionDTO targetUsage : targetSet) {
+ storageById.put(targetUsage.getName(), targetUsage);
+ }
+
+ for (final GarbageCollectionDTO usageToMerge : toMerge) {
+ final GarbageCollectionDTO targetUsage = storageById.get(usageToMerge.getName());
+ if (targetUsage == null) {
+ storageById.put(usageToMerge.getName(), usageToMerge);
+ } else {
+ merge(targetUsage, usageToMerge);
+ }
+ }
+
+ targetSet.clear();
+ targetSet.addAll(storageById.values());
+ }
+
+ public static void merge(final GarbageCollectionDTO target, final GarbageCollectionDTO toMerge) {
+ target.setCollectionCount(target.getCollectionCount() + toMerge.getCollectionCount());
+ target.setCollectionMillis(target.getCollectionMillis() + toMerge.getCollectionMillis());
+ updatePrettyPrintedFields(target);
+ }
+
+ public static void updatePrettyPrintedFields(final GarbageCollectionDTO target) {
+ target.setCollectionTime(FormatUtils.formatHoursMinutesSeconds(target.getCollectionMillis(), TimeUnit.MILLISECONDS));
+ }
+
+ public static void merge(final CountersDTO target, final CountersDTO toMerge, final String nodeId, final String nodeAddress, final Integer nodeApiPort) {
+ merge(target.getAggregateSnapshot(), toMerge.getAggregateSnapshot());
+
+ List<NodeCountersSnapshotDTO> nodeSnapshots = target.getNodeSnapshots();
+ if (nodeSnapshots == null) {
+ nodeSnapshots = new ArrayList<>();
+ }
+
+ final NodeCountersSnapshotDTO nodeCountersSnapshot = new NodeCountersSnapshotDTO();
+ nodeCountersSnapshot.setNodeId(nodeId);
+ nodeCountersSnapshot.setAddress(nodeAddress);
+ nodeCountersSnapshot.setApiPort(nodeApiPort);
+ nodeCountersSnapshot.setSnapshot(toMerge.getAggregateSnapshot());
+
+ nodeSnapshots.add(nodeCountersSnapshot);
+
+ target.setNodeSnapshots(nodeSnapshots);
+ }
+
+ public static void merge(final CountersSnapshotDTO target, final CountersSnapshotDTO toMerge) {
+ final Map<String, CounterDTO> counters = new HashMap<>();
+
+ for (final CounterDTO counter : target.getCounters()) {
+ counters.put(counter.getId(), counter);
+ }
+
+ for (final CounterDTO counter : toMerge.getCounters()) {
+ final CounterDTO existing = counters.get(counter.getId());
+ if (existing == null) {
+ counters.put(counter.getId(), counter);
+ } else {
+ merge(existing, counter);
+ }
+ }
+
+ target.setCounters(counters.values());
+ }
+
+ public static void merge(final CounterDTO target, final CounterDTO toMerge) {
+ target.setValueCount(target.getValueCount() + toMerge.getValueCount());
+ target.setValue(FormatUtils.formatCount(target.getValueCount()));
+ }
+
+
+ public static int getUtilization(final double used, final double total) {
+ return (int) Math.round((used / total) * 100);
+ }
+
+ public static String formatCount(final Integer intStatus) {
+ return intStatus == null ? "-" : FormatUtils.formatCount(intStatus);
+ }
+
+ public static String formatDataSize(final Long longStatus) {
+ return longStatus == null ? "-" : FormatUtils.formatDataSize(longStatus);
+ }
+
+ public static String prettyPrint(final Integer count, final Long bytes) {
+ return formatCount(count) + " / " + formatDataSize(bytes);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/0d3bd2c4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/ClusteredEventAccess.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/ClusteredEventAccess.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/ClusteredEventAccess.java
index d3d5559..3966a31 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/ClusteredEventAccess.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/ClusteredEventAccess.java
@@ -46,7 +46,7 @@ public class ClusteredEventAccess implements EventAccess {
@Override
public ProcessGroupStatus getControllerStatus() {
- return clusterManager.getProcessGroupStatus(WebClusterManager.ROOT_GROUP_ID_ALIAS);
+ return new ProcessGroupStatus();
}
@Override