You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by bb...@apache.org on 2016/10/03 17:58:28 UTC
nifi git commit: NIFI-2825: Fix S2S getPeers flow file count
Repository: nifi
Updated Branches:
refs/heads/master 34e5a5321 -> 17a36c6fd
NIFI-2825: Fix S2S getPeers flow file count
- Added ClusterWorkload message to retrieve workload information from a
cluster coordinator
- Use cluster workload to return queued flow file count to site-to-site
client so that it can calculate distribution of data transfer
This closes #1084.
Signed-off-by: Bryan Bende <bb...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/17a36c6f
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/17a36c6f
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/17a36c6f
Branch: refs/heads/master
Commit: 17a36c6fd525dba837e13a71a52ddfd224782fae
Parents: 34e5a53
Author: Koji Kawamura <ij...@apache.org>
Authored: Wed Sep 28 11:07:22 2016 +0900
Committer: Bryan Bende <bb...@apache.org>
Committed: Mon Oct 3 13:28:01 2016 -0400
----------------------------------------------------------------------
.../coordination/ClusterCoordinator.java | 8 +++
.../cluster/coordination/node/NodeWorkload.java | 71 ++++++++++++++++++++
.../protocol/AbstractNodeProtocolSender.java | 17 +++++
.../cluster/protocol/NodeProtocolSender.java | 10 +++
.../impl/NodeProtocolSenderListener.java | 7 ++
.../protocol/jaxb/message/ObjectFactory.java | 11 +++
.../message/ClusterWorkloadRequestMessage.java | 30 +++++++++
.../message/ClusterWorkloadResponseMessage.java | 43 ++++++++++++
.../protocol/message/ProtocolMessage.java | 4 +-
.../jaxb/message/TestJaxbProtocolUtils.java | 56 +++++++++++++++
.../ClusterProtocolHeartbeatMonitor.java | 39 +++++++++--
.../node/NodeClusterCoordinator.java | 15 ++++-
.../NodeClusterCoordinatorFactoryBean.java | 5 +-
.../heartbeat/TestAbstractHeartbeatMonitor.java | 6 ++
.../node/TestNodeClusterCoordinator.java | 8 ++-
.../apache/nifi/cluster/integration/Node.java | 3 +-
.../ClusterCoordinatorNodeInformant.java | 23 ++++---
.../apache/nifi/web/api/SiteToSiteResource.java | 31 +++++----
.../nifi/web/api/TestSiteToSiteResource.java | 63 +++++++++++++++++
19 files changed, 414 insertions(+), 36 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/17a36c6f/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/ClusterCoordinator.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/ClusterCoordinator.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/ClusterCoordinator.java
index 723a374..1083fe6 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/ClusterCoordinator.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/ClusterCoordinator.java
@@ -17,6 +17,7 @@
package org.apache.nifi.cluster.coordination;
+import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -24,6 +25,7 @@ import java.util.Set;
import org.apache.nifi.cluster.coordination.node.DisconnectionCode;
import org.apache.nifi.cluster.coordination.node.NodeConnectionState;
import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus;
+import org.apache.nifi.cluster.coordination.node.NodeWorkload;
import org.apache.nifi.cluster.event.NodeEvent;
import org.apache.nifi.cluster.protocol.NodeIdentifier;
import org.apache.nifi.reporting.Severity;
@@ -236,4 +238,10 @@ public interface ClusterCoordinator {
* @return the current status of Flow Election.
*/
String getFlowElectionStatus();
+
+ /**
+ * @return the current cluster workload retrieved from the cluster coordinator.
+ * @throws IOException thrown when it failed to communicate with the cluster coordinator.
+ */
+ Map<NodeIdentifier, NodeWorkload> getClusterWorkload() throws IOException;
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/17a36c6f/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/node/NodeWorkload.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/node/NodeWorkload.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/node/NodeWorkload.java
new file mode 100644
index 0000000..be5653e
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/node/NodeWorkload.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.cluster.coordination.node;
+
+import javax.xml.bind.annotation.XmlRootElement;
+
+@XmlRootElement(name = "nodeWorkload")
+public class NodeWorkload {
+
+ private long reportedTimestamp;
+ private int flowFileCount;
+ private long flowFileBytes;
+ private int activeThreadCount;
+ private long systemStartTime;
+
+ public long getReportedTimestamp() {
+ return reportedTimestamp;
+ }
+
+ public void setReportedTimestamp(long reportedTimestamp) {
+ this.reportedTimestamp = reportedTimestamp;
+ }
+
+ public int getFlowFileCount() {
+ return flowFileCount;
+ }
+
+ public void setFlowFileCount(int flowFileCount) {
+ this.flowFileCount = flowFileCount;
+ }
+
+ public long getFlowFileBytes() {
+ return flowFileBytes;
+ }
+
+ public void setFlowFileBytes(long flowFileBytes) {
+ this.flowFileBytes = flowFileBytes;
+ }
+
+ public int getActiveThreadCount() {
+ return activeThreadCount;
+ }
+
+ public void setActiveThreadCount(int activeThreadCount) {
+ this.activeThreadCount = activeThreadCount;
+ }
+
+ public long getSystemStartTime() {
+ return systemStartTime;
+ }
+
+ public void setSystemStartTime(long systemStartTime) {
+ this.systemStartTime = systemStartTime;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/17a36c6f/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/AbstractNodeProtocolSender.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/AbstractNodeProtocolSender.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/AbstractNodeProtocolSender.java
index 22d6ebc..db3fc1d 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/AbstractNodeProtocolSender.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/AbstractNodeProtocolSender.java
@@ -21,6 +21,8 @@ import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
+import org.apache.nifi.cluster.protocol.message.ClusterWorkloadRequestMessage;
+import org.apache.nifi.cluster.protocol.message.ClusterWorkloadResponseMessage;
import org.apache.nifi.cluster.protocol.message.ConnectionRequestMessage;
import org.apache.nifi.cluster.protocol.message.ConnectionResponseMessage;
import org.apache.nifi.cluster.protocol.message.HeartbeatMessage;
@@ -94,6 +96,21 @@ public abstract class AbstractNodeProtocolSender implements NodeProtocolSender {
throw new ProtocolException("Expected message type '" + MessageType.HEARTBEAT_RESPONSE + "' but found '" + responseMessage.getType() + "'");
}
+ @Override
+ public ClusterWorkloadResponseMessage clusterWorkload(final ClusterWorkloadRequestMessage msg) throws ProtocolException {
+ final InetSocketAddress serviceAddress;
+ try {
+ serviceAddress = getServiceAddress();
+ } catch (IOException e) {
+ throw new ProtocolException("Failed to getServiceAddress due to " + e, e);
+ }
+ final ProtocolMessage responseMessage = sendProtocolMessage(msg, serviceAddress.getHostName(), serviceAddress.getPort());
+ if (MessageType.CLUSTER_WORKLOAD_RESPONSE == responseMessage.getType()) {
+ return (ClusterWorkloadResponseMessage) responseMessage;
+ }
+
+ throw new ProtocolException("Expected message type '" + MessageType.CLUSTER_WORKLOAD_RESPONSE + "' but found '" + responseMessage.getType() + "'");
+ }
private Socket createSocket() {
InetSocketAddress socketAddress = null;
http://git-wip-us.apache.org/repos/asf/nifi/blob/17a36c6f/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/NodeProtocolSender.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/NodeProtocolSender.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/NodeProtocolSender.java
index fcf5195..bfcc62c 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/NodeProtocolSender.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/NodeProtocolSender.java
@@ -16,6 +16,8 @@
*/
package org.apache.nifi.cluster.protocol;
+import org.apache.nifi.cluster.protocol.message.ClusterWorkloadRequestMessage;
+import org.apache.nifi.cluster.protocol.message.ClusterWorkloadResponseMessage;
import org.apache.nifi.cluster.protocol.message.ConnectionRequestMessage;
import org.apache.nifi.cluster.protocol.message.ConnectionResponseMessage;
import org.apache.nifi.cluster.protocol.message.HeartbeatMessage;
@@ -49,4 +51,12 @@ public interface NodeProtocolSender {
* @return the response from the Cluster Coordinator
*/
HeartbeatResponseMessage heartbeat(HeartbeatMessage msg, String address) throws ProtocolException;
+
+ /**
+ * Sends a "cluster workflow request" message to the Cluster Coordinator.
+ * @param msg a request message
+ * @return the response from the Cluster Coordinator
+ * @throws ProtocolException if communication failed
+ */
+ ClusterWorkloadResponseMessage clusterWorkload(ClusterWorkloadRequestMessage msg) throws ProtocolException;
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/17a36c6f/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/NodeProtocolSenderListener.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/NodeProtocolSenderListener.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/NodeProtocolSenderListener.java
index 1b0aeea..d13b3d3 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/NodeProtocolSenderListener.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/NodeProtocolSenderListener.java
@@ -24,6 +24,8 @@ import org.apache.nifi.cluster.protocol.ProtocolException;
import org.apache.nifi.cluster.protocol.ProtocolHandler;
import org.apache.nifi.cluster.protocol.ProtocolListener;
import org.apache.nifi.cluster.protocol.UnknownServiceAddressException;
+import org.apache.nifi.cluster.protocol.message.ClusterWorkloadRequestMessage;
+import org.apache.nifi.cluster.protocol.message.ClusterWorkloadResponseMessage;
import org.apache.nifi.cluster.protocol.message.ConnectionRequestMessage;
import org.apache.nifi.cluster.protocol.message.ConnectionResponseMessage;
import org.apache.nifi.cluster.protocol.message.HeartbeatMessage;
@@ -96,4 +98,9 @@ public class NodeProtocolSenderListener implements NodeProtocolSender, ProtocolL
public HeartbeatResponseMessage heartbeat(final HeartbeatMessage msg, final String address) throws ProtocolException {
return sender.heartbeat(msg, address);
}
+
+ @Override
+ public ClusterWorkloadResponseMessage clusterWorkload(ClusterWorkloadRequestMessage msg) throws ProtocolException {
+ return sender.clusterWorkload(msg);
+ }
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/17a36c6f/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/ObjectFactory.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/ObjectFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/ObjectFactory.java
index afa87b9..9a594a4 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/ObjectFactory.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/ObjectFactory.java
@@ -25,6 +25,8 @@ import org.apache.nifi.cluster.protocol.message.FlowRequestMessage;
import org.apache.nifi.cluster.protocol.message.FlowResponseMessage;
import org.apache.nifi.cluster.protocol.message.HeartbeatMessage;
import org.apache.nifi.cluster.protocol.message.HeartbeatResponseMessage;
+import org.apache.nifi.cluster.protocol.message.ClusterWorkloadRequestMessage;
+import org.apache.nifi.cluster.protocol.message.ClusterWorkloadResponseMessage;
import org.apache.nifi.cluster.protocol.message.MulticastProtocolMessage;
import org.apache.nifi.cluster.protocol.message.NodeConnectionStatusRequestMessage;
import org.apache.nifi.cluster.protocol.message.NodeConnectionStatusResponseMessage;
@@ -101,4 +103,13 @@ public class ObjectFactory {
public HeartbeatResponseMessage createHeartbeatResponse() {
return new HeartbeatResponseMessage();
}
+
+ public ClusterWorkloadRequestMessage createClusterWorkloadRequest() {
+ return new ClusterWorkloadRequestMessage();
+ }
+
+ public ClusterWorkloadResponseMessage createClusterWorkloadResponse() {
+ return new ClusterWorkloadResponseMessage();
+ }
+
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/17a36c6f/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ClusterWorkloadRequestMessage.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ClusterWorkloadRequestMessage.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ClusterWorkloadRequestMessage.java
new file mode 100644
index 0000000..d8f87a4
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ClusterWorkloadRequestMessage.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.cluster.protocol.message;
+
+import javax.xml.bind.annotation.XmlRootElement;
+
+@XmlRootElement(name = "clusterWorkloadRequest")
+public class ClusterWorkloadRequestMessage extends ProtocolMessage {
+
+ @Override
+ public MessageType getType() {
+ return MessageType.CLUSTER_WORKLOAD_REQUEST;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/17a36c6f/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ClusterWorkloadResponseMessage.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ClusterWorkloadResponseMessage.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ClusterWorkloadResponseMessage.java
new file mode 100644
index 0000000..2852c48
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ClusterWorkloadResponseMessage.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.cluster.protocol.message;
+
+import org.apache.nifi.cluster.coordination.node.NodeWorkload;
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
+
+import javax.xml.bind.annotation.XmlRootElement;
+import java.util.Map;
+
+@XmlRootElement(name = "clusterWorkloadResponse")
+public class ClusterWorkloadResponseMessage extends ProtocolMessage {
+
+ private Map<NodeIdentifier, NodeWorkload> nodeWorkloads;
+
+ @Override
+ public MessageType getType() {
+ return MessageType.CLUSTER_WORKLOAD_RESPONSE;
+ }
+
+ public Map<NodeIdentifier, NodeWorkload> getNodeWorkloads() {
+ return nodeWorkloads;
+ }
+
+ public void setNodeWorkloads(Map<NodeIdentifier, NodeWorkload> nodeWorkloads) {
+ this.nodeWorkloads = nodeWorkloads;
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/17a36c6f/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ProtocolMessage.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ProtocolMessage.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ProtocolMessage.java
index 1d0d115..1cab62f 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ProtocolMessage.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ProtocolMessage.java
@@ -35,7 +35,9 @@ public abstract class ProtocolMessage {
HEARTBEAT_RESPONSE,
NODE_CONNECTION_STATUS_REQUEST,
NODE_CONNECTION_STATUS_RESPONSE,
- NODE_STATUS_CHANGE;
+ NODE_STATUS_CHANGE,
+ CLUSTER_WORKLOAD_REQUEST,
+ CLUSTER_WORKLOAD_RESPONSE
}
public abstract MessageType getType();
http://git-wip-us.apache.org/repos/asf/nifi/blob/17a36c6f/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/jaxb/message/TestJaxbProtocolUtils.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/jaxb/message/TestJaxbProtocolUtils.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/jaxb/message/TestJaxbProtocolUtils.java
index 4fa53e8..8c2cca6 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/jaxb/message/TestJaxbProtocolUtils.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/jaxb/message/TestJaxbProtocolUtils.java
@@ -23,12 +23,16 @@ import static org.junit.Assert.assertTrue;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
+import java.util.stream.IntStream;
import javax.xml.bind.JAXBException;
import org.apache.nifi.cluster.coordination.node.DisconnectionCode;
import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus;
+import org.apache.nifi.cluster.coordination.node.NodeWorkload;
import org.apache.nifi.cluster.protocol.ComponentRevision;
import org.apache.nifi.cluster.protocol.ConnectionResponse;
import org.apache.nifi.cluster.protocol.DataFlow;
@@ -38,6 +42,8 @@ import org.apache.nifi.cluster.protocol.NodeIdentifier;
import org.apache.nifi.cluster.protocol.StandardDataFlow;
import org.apache.nifi.cluster.protocol.message.ConnectionResponseMessage;
import org.apache.nifi.cluster.protocol.message.HeartbeatMessage;
+import org.apache.nifi.cluster.protocol.message.ClusterWorkloadRequestMessage;
+import org.apache.nifi.cluster.protocol.message.ClusterWorkloadResponseMessage;
import org.apache.nifi.cluster.protocol.message.NodeConnectionStatusRequestMessage;
import org.apache.nifi.cluster.protocol.message.NodeConnectionStatusResponseMessage;
import org.apache.nifi.web.Revision;
@@ -124,4 +130,54 @@ public class TestJaxbProtocolUtils {
final Object unmarshalled = JaxbProtocolUtils.JAXB_CONTEXT.createUnmarshaller().unmarshal(new ByteArrayInputStream(baos.toByteArray()));
assertTrue(unmarshalled instanceof HeartbeatMessage);
}
+
+ @Test
+ public void testRoundTripClusterWorkloadRequest() throws JAXBException {
+ final ClusterWorkloadRequestMessage msg = new ClusterWorkloadRequestMessage();
+ final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ JaxbProtocolUtils.JAXB_CONTEXT.createMarshaller().marshal(msg, baos);
+ final Object unmarshalled = JaxbProtocolUtils.JAXB_CONTEXT.createUnmarshaller().unmarshal(new ByteArrayInputStream(baos.toByteArray()));
+ assertTrue(unmarshalled instanceof ClusterWorkloadRequestMessage);
+ }
+
+ @Test
+ public void testRoundTripClusterWorkloadResponse() throws JAXBException {
+ final ClusterWorkloadResponseMessage msg = new ClusterWorkloadResponseMessage();
+ final Map<NodeIdentifier, NodeWorkload> expectedNodeWorkloads = new HashMap<>();
+
+ IntStream.range(1, 4).forEach(i -> {
+ final String hostname = "node" + i;
+ final NodeIdentifier nodeId = new NodeIdentifier(hostname, hostname, 8080, hostname, 8081, hostname, 8082, 8083, false);
+ final NodeWorkload workload = new NodeWorkload();
+ workload.setReportedTimestamp(System.currentTimeMillis() - 1000);
+ workload.setSystemStartTime(System.currentTimeMillis());
+ workload.setActiveThreadCount(i);
+ workload.setFlowFileCount(i * 10);
+ workload.setFlowFileBytes(i * 10 * 1024);
+ expectedNodeWorkloads.put(nodeId, workload);
+ });
+ msg.setNodeWorkloads(expectedNodeWorkloads);
+
+ // Marshall.
+ final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ JaxbProtocolUtils.JAXB_CONTEXT.createMarshaller().marshal(msg, baos);
+
+ // Un-marshall.
+ final Object unmarshalled = JaxbProtocolUtils.JAXB_CONTEXT.createUnmarshaller().unmarshal(new ByteArrayInputStream(baos.toByteArray()));
+ assertTrue(unmarshalled instanceof ClusterWorkloadResponseMessage);
+
+ // Assert result.
+ final ClusterWorkloadResponseMessage response = (ClusterWorkloadResponseMessage) unmarshalled;
+ assertEquals(expectedNodeWorkloads.size(), response.getNodeWorkloads().size());
+ response.getNodeWorkloads().entrySet().stream().forEach(entry -> {
+ assertTrue(expectedNodeWorkloads.containsKey(entry.getKey()));
+ final NodeWorkload w = entry.getValue();
+ NodeWorkload expectedW = expectedNodeWorkloads.get(entry.getKey());
+ assertEquals(expectedW.getActiveThreadCount(), w.getActiveThreadCount());
+ assertEquals(expectedW.getReportedTimestamp(), w.getReportedTimestamp());
+ assertEquals(expectedW.getSystemStartTime(), w.getSystemStartTime());
+ assertEquals(expectedW.getFlowFileBytes(), w.getFlowFileBytes());
+ assertEquals(expectedW.getFlowFileCount(), w.getFlowFileCount());
+ });
+ }
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/17a36c6f/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/ClusterProtocolHeartbeatMonitor.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/ClusterProtocolHeartbeatMonitor.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/ClusterProtocolHeartbeatMonitor.java
index 9f620d9..6a8e575 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/ClusterProtocolHeartbeatMonitor.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/ClusterProtocolHeartbeatMonitor.java
@@ -18,6 +18,7 @@ package org.apache.nifi.cluster.coordination.heartbeat;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -30,6 +31,7 @@ import javax.xml.bind.Unmarshaller;
import org.apache.nifi.cluster.coordination.ClusterCoordinator;
import org.apache.nifi.cluster.coordination.node.NodeConnectionState;
import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus;
+import org.apache.nifi.cluster.coordination.node.NodeWorkload;
import org.apache.nifi.cluster.protocol.Heartbeat;
import org.apache.nifi.cluster.protocol.HeartbeatPayload;
import org.apache.nifi.cluster.protocol.NodeIdentifier;
@@ -38,6 +40,8 @@ import org.apache.nifi.cluster.protocol.ProtocolHandler;
import org.apache.nifi.cluster.protocol.ProtocolListener;
import org.apache.nifi.cluster.protocol.message.HeartbeatMessage;
import org.apache.nifi.cluster.protocol.message.HeartbeatResponseMessage;
+import org.apache.nifi.cluster.protocol.message.ClusterWorkloadRequestMessage;
+import org.apache.nifi.cluster.protocol.message.ClusterWorkloadResponseMessage;
import org.apache.nifi.cluster.protocol.message.ProtocolMessage;
import org.apache.nifi.cluster.protocol.message.ProtocolMessage.MessageType;
import org.apache.nifi.util.NiFiProperties;
@@ -130,11 +134,18 @@ public class ClusterProtocolHeartbeatMonitor extends AbstractHeartbeatMonitor im
@Override
public ProtocolMessage handle(final ProtocolMessage msg) throws ProtocolException {
- if (msg.getType() != MessageType.HEARTBEAT) {
- throw new ProtocolException("Cannot handle message of type " + msg.getType());
+ switch (msg.getType()) {
+ case HEARTBEAT:
+ return handleHeartbeat((HeartbeatMessage) msg);
+ case CLUSTER_WORKLOAD_REQUEST:
+ return handleClusterWorkload((ClusterWorkloadRequestMessage) msg);
+ default:
+ throw new ProtocolException("Cannot handle message of type " + msg.getType());
}
+ }
- final HeartbeatMessage heartbeatMsg = (HeartbeatMessage) msg;
+ private ProtocolMessage handleHeartbeat(final HeartbeatMessage msg) {
+ final HeartbeatMessage heartbeatMsg = msg;
final Heartbeat heartbeat = heartbeatMsg.getHeartbeat();
final NodeIdentifier nodeId = heartbeat.getNodeIdentifier();
@@ -169,6 +180,26 @@ public class ClusterProtocolHeartbeatMonitor extends AbstractHeartbeatMonitor im
return responseMessage;
}
+ private ProtocolMessage handleClusterWorkload(final ClusterWorkloadRequestMessage msg) {
+
+ final ClusterWorkloadResponseMessage response = new ClusterWorkloadResponseMessage();
+ final Map<NodeIdentifier, NodeWorkload> workloads = new HashMap<>();
+ getLatestHeartbeats().values().stream()
+ .filter(hb -> NodeConnectionState.CONNECTED.equals(hb.getConnectionStatus().getState()))
+ .forEach(hb -> {
+ NodeWorkload wl = new NodeWorkload();
+ wl.setReportedTimestamp(hb.getTimestamp());
+ wl.setSystemStartTime(hb.getSystemStartTime());
+ wl.setActiveThreadCount(hb.getActiveThreadCount());
+ wl.setFlowFileCount(hb.getFlowFileCount());
+ wl.setFlowFileBytes(hb.getFlowFileBytes());
+ workloads.put(hb.getNodeIdentifier(), wl);
+ });
+ response.setNodeWorkloads(workloads);
+
+ return response;
+ }
+
private List<NodeConnectionStatus> getUpdatedStatuses(final List<NodeConnectionStatus> nodeStatusList) {
// Map node's statuses by NodeIdentifier for quick & easy lookup
final Map<NodeIdentifier, NodeConnectionStatus> nodeStatusMap = nodeStatusList.stream()
@@ -201,6 +232,6 @@ public class ClusterProtocolHeartbeatMonitor extends AbstractHeartbeatMonitor im
@Override
public boolean canHandle(ProtocolMessage msg) {
- return msg.getType() == MessageType.HEARTBEAT;
+ return msg.getType() == MessageType.HEARTBEAT || msg.getType() == MessageType.CLUSTER_WORKLOAD_REQUEST;
}
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/17a36c6f/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java
index a6a6009..4b74e1b 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java
@@ -49,10 +49,13 @@ import org.apache.nifi.cluster.protocol.ConnectionRequest;
import org.apache.nifi.cluster.protocol.ConnectionResponse;
import org.apache.nifi.cluster.protocol.DataFlow;
import org.apache.nifi.cluster.protocol.NodeIdentifier;
+import org.apache.nifi.cluster.protocol.NodeProtocolSender;
import org.apache.nifi.cluster.protocol.ProtocolException;
import org.apache.nifi.cluster.protocol.ProtocolHandler;
import org.apache.nifi.cluster.protocol.StandardDataFlow;
import org.apache.nifi.cluster.protocol.impl.ClusterCoordinationProtocolSenderListener;
+import org.apache.nifi.cluster.protocol.message.ClusterWorkloadRequestMessage;
+import org.apache.nifi.cluster.protocol.message.ClusterWorkloadResponseMessage;
import org.apache.nifi.cluster.protocol.message.ConnectionRequestMessage;
import org.apache.nifi.cluster.protocol.message.ConnectionResponseMessage;
import org.apache.nifi.cluster.protocol.message.DisconnectMessage;
@@ -88,6 +91,7 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
private final LeaderElectionManager leaderElectionManager;
private final AtomicLong latestUpdateId = new AtomicLong(-1);
private final FlowElection flowElection;
+ private final NodeProtocolSender nodeProtocolSender;
private volatile FlowService flowService;
private volatile boolean connected;
@@ -98,7 +102,8 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
private final ConcurrentMap<NodeIdentifier, CircularFifoQueue<NodeEvent>> nodeEvents = new ConcurrentHashMap<>();
public NodeClusterCoordinator(final ClusterCoordinationProtocolSenderListener senderListener, final EventReporter eventReporter, final LeaderElectionManager leaderElectionManager,
- final FlowElection flowElection, final ClusterNodeFirewall firewall, final RevisionManager revisionManager, final NiFiProperties nifiProperties) {
+ final FlowElection flowElection, final ClusterNodeFirewall firewall, final RevisionManager revisionManager, final NiFiProperties nifiProperties,
+ final NodeProtocolSender nodeProtocolSender) {
this.senderListener = senderListener;
this.flowService = null;
this.eventReporter = eventReporter;
@@ -107,6 +112,7 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
this.nifiProperties = nifiProperties;
this.leaderElectionManager = leaderElectionManager;
this.flowElection = flowElection;
+ this.nodeProtocolSender = nodeProtocolSender;
senderListener.addHandler(this);
}
@@ -1043,4 +1049,11 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
public boolean isConnected() {
return connected;
}
+
+ @Override
+ public Map<NodeIdentifier, NodeWorkload> getClusterWorkload() throws IOException {
+ final ClusterWorkloadRequestMessage request = new ClusterWorkloadRequestMessage();
+ final ClusterWorkloadResponseMessage response = nodeProtocolSender.clusterWorkload(request);
+ return response.getNodeWorkloads();
+ }
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/17a36c6f/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/spring/NodeClusterCoordinatorFactoryBean.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/spring/NodeClusterCoordinatorFactoryBean.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/spring/NodeClusterCoordinatorFactoryBean.java
index 2845a01..ac79a42 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/spring/NodeClusterCoordinatorFactoryBean.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/spring/NodeClusterCoordinatorFactoryBean.java
@@ -20,6 +20,7 @@ package org.apache.nifi.cluster.spring;
import org.apache.nifi.cluster.coordination.flow.FlowElection;
import org.apache.nifi.cluster.coordination.node.NodeClusterCoordinator;
import org.apache.nifi.cluster.firewall.ClusterNodeFirewall;
+import org.apache.nifi.cluster.protocol.NodeProtocolSender;
import org.apache.nifi.cluster.protocol.impl.ClusterCoordinationProtocolSenderListener;
import org.apache.nifi.controller.leader.election.LeaderElectionManager;
import org.apache.nifi.events.EventReporter;
@@ -46,8 +47,8 @@ public class NodeClusterCoordinatorFactoryBean implements FactoryBean<NodeCluste
final RevisionManager revisionManager = applicationContext.getBean("revisionManager", RevisionManager.class);
final LeaderElectionManager electionManager = applicationContext.getBean("leaderElectionManager", LeaderElectionManager.class);
final FlowElection flowElection = applicationContext.getBean("flowElection", FlowElection.class);
-
- nodeClusterCoordinator = new NodeClusterCoordinator(protocolSenderListener, eventReporter, electionManager, flowElection, clusterFirewall, revisionManager, properties);
+ final NodeProtocolSender nodeProtocolSender = applicationContext.getBean("nodeProtocolSender", NodeProtocolSender.class);
+ nodeClusterCoordinator = new NodeClusterCoordinator(protocolSenderListener, eventReporter, electionManager, flowElection, clusterFirewall, revisionManager, properties, nodeProtocolSender);
}
return nodeClusterCoordinator;
http://git-wip-us.apache.org/repos/asf/nifi/blob/17a36c6f/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/heartbeat/TestAbstractHeartbeatMonitor.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/heartbeat/TestAbstractHeartbeatMonitor.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/heartbeat/TestAbstractHeartbeatMonitor.java
index e83999a..690cda8 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/heartbeat/TestAbstractHeartbeatMonitor.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/heartbeat/TestAbstractHeartbeatMonitor.java
@@ -36,6 +36,7 @@ import org.apache.nifi.cluster.coordination.ClusterCoordinator;
import org.apache.nifi.cluster.coordination.node.DisconnectionCode;
import org.apache.nifi.cluster.coordination.node.NodeConnectionState;
import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus;
+import org.apache.nifi.cluster.coordination.node.NodeWorkload;
import org.apache.nifi.cluster.event.NodeEvent;
import org.apache.nifi.cluster.protocol.NodeIdentifier;
import org.apache.nifi.reporting.Severity;
@@ -329,6 +330,11 @@ public class TestAbstractHeartbeatMonitor {
public String getFlowElectionStatus() {
return null;
}
+
+ @Override
+ public Map<NodeIdentifier, NodeWorkload> getClusterWorkload() throws IOException {
+ return null;
+ }
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/17a36c6f/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/node/TestNodeClusterCoordinator.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/node/TestNodeClusterCoordinator.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/node/TestNodeClusterCoordinator.java
index be9b862..e2577a9 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/node/TestNodeClusterCoordinator.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/node/TestNodeClusterCoordinator.java
@@ -80,7 +80,7 @@ public class TestNodeClusterCoordinator {
final RevisionManager revisionManager = Mockito.mock(RevisionManager.class);
Mockito.when(revisionManager.getAllRevisions()).thenReturn(Collections.emptyList());
- coordinator = new NodeClusterCoordinator(senderListener, eventReporter, null, new FirstVoteWinsFlowElection(), null, revisionManager, createProperties()) {
+ coordinator = new NodeClusterCoordinator(senderListener, eventReporter, null, new FirstVoteWinsFlowElection(), null, revisionManager, createProperties(), null) {
@Override
void notifyOthersOfNodeStatusChange(NodeConnectionStatus updatedStatus, boolean notifyAllNodes, boolean waitForCoordinator) {
nodeStatuses.add(updatedStatus);
@@ -135,7 +135,8 @@ public class TestNodeClusterCoordinator {
final RevisionManager revisionManager = Mockito.mock(RevisionManager.class);
Mockito.when(revisionManager.getAllRevisions()).thenReturn(Collections.emptyList());
- final NodeClusterCoordinator coordinator = new NodeClusterCoordinator(senderListener, eventReporter, null, new FirstVoteWinsFlowElection(), null, revisionManager, createProperties()) {
+ final NodeClusterCoordinator coordinator = new NodeClusterCoordinator(senderListener, eventReporter, null, new FirstVoteWinsFlowElection(),
+ null, revisionManager, createProperties(), null) {
@Override
void notifyOthersOfNodeStatusChange(NodeConnectionStatus updatedStatus, boolean notifyAllNodes, boolean waitForCoordinator) {
}
@@ -175,7 +176,8 @@ public class TestNodeClusterCoordinator {
final RevisionManager revisionManager = Mockito.mock(RevisionManager.class);
Mockito.when(revisionManager.getAllRevisions()).thenReturn(Collections.emptyList());
- final NodeClusterCoordinator coordinator = new NodeClusterCoordinator(senderListener, eventReporter, null, new FirstVoteWinsFlowElection(), null, revisionManager, createProperties()) {
+ final NodeClusterCoordinator coordinator = new NodeClusterCoordinator(senderListener, eventReporter, null, new FirstVoteWinsFlowElection(),
+ null, revisionManager, createProperties(), null) {
@Override
void notifyOthersOfNodeStatusChange(NodeConnectionStatus updatedStatus, boolean notifyAllNodes, boolean waitForCoordinator) {
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/17a36c6f/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/Node.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/Node.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/Node.java
index 93c9397..7c74680 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/Node.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/Node.java
@@ -277,7 +277,8 @@ public class Node {
}
final ClusterCoordinationProtocolSenderListener protocolSenderListener = new ClusterCoordinationProtocolSenderListener(createCoordinatorProtocolSender(), protocolListener);
- return new NodeClusterCoordinator(protocolSenderListener, eventReporter, electionManager, flowElection, null, revisionManager, nodeProperties);
+ return new NodeClusterCoordinator(protocolSenderListener, eventReporter, electionManager, flowElection, null,
+ revisionManager, nodeProperties, protocolSender);
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/17a36c6f/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/ClusterCoordinatorNodeInformant.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/ClusterCoordinatorNodeInformant.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/ClusterCoordinatorNodeInformant.java
index 9f8439c..95381af 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/ClusterCoordinatorNodeInformant.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/ClusterCoordinatorNodeInformant.java
@@ -17,12 +17,11 @@
package org.apache.nifi.controller;
-import java.util.ArrayList;
+import java.io.IOException;
import java.util.List;
-import java.util.Set;
+import java.util.stream.Collectors;
import org.apache.nifi.cluster.coordination.ClusterCoordinator;
-import org.apache.nifi.cluster.coordination.node.NodeConnectionState;
import org.apache.nifi.cluster.protocol.NodeIdentifier;
import org.apache.nifi.remote.cluster.ClusterNodeInformation;
import org.apache.nifi.remote.cluster.NodeInformant;
@@ -37,14 +36,16 @@ public class ClusterCoordinatorNodeInformant implements NodeInformant {
@Override
public ClusterNodeInformation getNodeInformation() {
- final List<NodeInformation> nodeInfoCollection = new ArrayList<>();
- final Set<NodeIdentifier> nodeIds = clusterCoordinator.getNodeIdentifiers(NodeConnectionState.CONNECTED);
-
- // TODO: Get total number of FlowFiles for each node
- for (final NodeIdentifier nodeId : nodeIds) {
- final NodeInformation nodeInfo = new NodeInformation(nodeId.getSiteToSiteAddress(), nodeId.getSiteToSitePort(),
- nodeId.getSiteToSiteHttpApiPort(), nodeId.getApiPort(), nodeId.isSiteToSiteSecure(), 0);
- nodeInfoCollection.add(nodeInfo);
+ final List<NodeInformation> nodeInfoCollection;
+ try {
+ nodeInfoCollection = clusterCoordinator.getClusterWorkload().entrySet().stream().map(entry -> {
+ final NodeIdentifier nodeId = entry.getKey();
+ final NodeInformation nodeInfo = new NodeInformation(nodeId.getSiteToSiteAddress(), nodeId.getSiteToSitePort(),
+ nodeId.getSiteToSiteHttpApiPort(), nodeId.getApiPort(), nodeId.isSiteToSiteSecure(), entry.getValue().getFlowFileCount());
+ return nodeInfo;
+ }).collect(Collectors.toList());
+ } catch (IOException e) {
+ throw new RuntimeException("Failed to retrieve cluster workload due to " + e, e);
}
final ClusterNodeInformation nodeInfo = new ClusterNodeInformation();
http://git-wip-us.apache.org/repos/asf/nifi/blob/17a36c6f/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/SiteToSiteResource.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/SiteToSiteResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/SiteToSiteResource.java
index 63e5a35..efb1c26 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/SiteToSiteResource.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/SiteToSiteResource.java
@@ -34,7 +34,7 @@ import org.apache.nifi.authorization.resource.ResourceFactory;
import org.apache.nifi.authorization.user.NiFiUser;
import org.apache.nifi.authorization.user.NiFiUserUtils;
import org.apache.nifi.cluster.coordination.ClusterCoordinator;
-import org.apache.nifi.cluster.coordination.node.NodeConnectionState;
+import org.apache.nifi.cluster.coordination.node.NodeWorkload;
import org.apache.nifi.cluster.protocol.NodeIdentifier;
import org.apache.nifi.remote.HttpRemoteSiteListener;
import org.apache.nifi.remote.VersionNegotiator;
@@ -59,13 +59,13 @@ import javax.ws.rs.Produces;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
+import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Set;
import static org.apache.commons.lang3.StringUtils.isEmpty;
@@ -216,18 +216,23 @@ public class SiteToSiteResource extends ApplicationResource {
final List<PeerDTO> peers = new ArrayList<>();
if (properties.isNode()) {
- final Set<NodeIdentifier> nodeIds = clusterCoordinator.getNodeIdentifiers(NodeConnectionState.CONNECTED);
-
- // TODO: Get total number of FlowFiles for each node
- for (final NodeIdentifier nodeId : nodeIds) {
- final PeerDTO peer = new PeerDTO();
- final String siteToSiteAddress = nodeId.getSiteToSiteAddress();
- peer.setHostname(siteToSiteAddress == null ? nodeId.getApiAddress() : siteToSiteAddress);
- peer.setPort(nodeId.getSiteToSiteHttpApiPort() == null ? nodeId.getApiPort() : nodeId.getSiteToSiteHttpApiPort());
- peer.setSecure(nodeId.isSiteToSiteSecure());
- peer.setFlowFileCount(0);
- peers.add(peer);
+
+ try {
+ final Map<NodeIdentifier, NodeWorkload> clusterWorkload = clusterCoordinator.getClusterWorkload();
+ clusterWorkload.entrySet().stream().forEach(entry -> {
+ final PeerDTO peer = new PeerDTO();
+ final NodeIdentifier nodeId = entry.getKey();
+ final String siteToSiteAddress = nodeId.getSiteToSiteAddress();
+ peer.setHostname(siteToSiteAddress == null ? nodeId.getApiAddress() : siteToSiteAddress);
+ peer.setPort(nodeId.getSiteToSiteHttpApiPort() == null ? nodeId.getApiPort() : nodeId.getSiteToSiteHttpApiPort());
+ peer.setSecure(nodeId.isSiteToSiteSecure());
+ peer.setFlowFileCount(entry.getValue().getFlowFileCount());
+ peers.add(peer);
+ });
+ } catch (IOException e) {
+ throw new RuntimeException("Failed to retrieve cluster workload due to " + e, e);
}
+
} else {
// Standalone mode.
final PeerDTO peer = new PeerDTO();
http://git-wip-us.apache.org/repos/asf/nifi/blob/17a36c6f/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/api/TestSiteToSiteResource.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/api/TestSiteToSiteResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/api/TestSiteToSiteResource.java
index 60a7ba9..5f3aef3 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/api/TestSiteToSiteResource.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/api/TestSiteToSiteResource.java
@@ -16,6 +16,9 @@
*/
package org.apache.nifi.web.api;
+import org.apache.nifi.cluster.coordination.ClusterCoordinator;
+import org.apache.nifi.cluster.coordination.node.NodeWorkload;
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
import org.apache.nifi.remote.protocol.ResponseCode;
import org.apache.nifi.remote.protocol.http.HttpHeaders;
import org.apache.nifi.util.NiFiProperties;
@@ -30,12 +33,17 @@ import org.junit.Test;
import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.core.Response;
import java.net.URL;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.IntStream;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
public class TestSiteToSiteResource {
@@ -116,6 +124,47 @@ public class TestSiteToSiteResource {
assertEquals(1, resultEntity.getPeers().size());
}
+ @Test
+ public void testPeersClustered() throws Exception {
+ final HttpServletRequest req = createCommonHttpServletRequest();
+
+ final NiFiServiceFacade serviceFacade = mock(NiFiServiceFacade.class);
+
+ final SiteToSiteResource resource = getSiteToSiteResourceClustered(serviceFacade);
+
+ final ClusterCoordinator clusterCoordinator = mock(ClusterCoordinator.class);
+ final Map<String, NodeWorkload> hostportWorkloads = new HashMap<>();
+ final Map<NodeIdentifier, NodeWorkload> workloads = new HashMap<>();
+ IntStream.range(1, 4).forEach(i -> {
+ final String hostname = "node" + i;
+ final int siteToSiteHttpApiPort = 8110 + i;
+ final NodeIdentifier nodeId = new NodeIdentifier(hostname, hostname, 8080 + i, hostname, 8090 + i, hostname, 8100 + i, siteToSiteHttpApiPort, false);
+ final NodeWorkload workload = new NodeWorkload();
+ workload.setReportedTimestamp(System.currentTimeMillis() - i);
+ workload.setFlowFileBytes(1024 * i);
+ workload.setFlowFileCount(10 * i);
+ workload.setActiveThreadCount(i);
+ workload.setSystemStartTime(System.currentTimeMillis() - (1000 * i));
+ workloads.put(nodeId, workload);
+ hostportWorkloads.put(hostname + ":" + siteToSiteHttpApiPort, workload);
+ });
+ when(clusterCoordinator.getClusterWorkload()).thenReturn(workloads);
+ resource.setClusterCoordinator(clusterCoordinator);
+
+ final Response response = resource.getPeers(req);
+
+ PeersEntity resultEntity = (PeersEntity) response.getEntity();
+
+ assertEquals(200, response.getStatus());
+ assertEquals(3, resultEntity.getPeers().size());
+ resultEntity.getPeers().stream().forEach(peerDTO -> {
+ final NodeWorkload workload = hostportWorkloads.get(peerDTO.getHostname() + ":" + peerDTO.getPort());
+ assertNotNull(workload);
+ assertEquals(workload.getFlowFileCount(), peerDTO.getFlowFileCount());
+ });
+
+ }
+
@Test
public void testPeersVersionWasNotSpecified() throws Exception {
@@ -160,4 +209,18 @@ public class TestSiteToSiteResource {
resource.setServiceFacade(serviceFacade);
return resource;
}
+
+ private SiteToSiteResource getSiteToSiteResourceClustered(final NiFiServiceFacade serviceFacade) {
+ final Map<String, String> clusterSettings = new HashMap<>();
+ clusterSettings.put(NiFiProperties.CLUSTER_IS_NODE, "true");
+ final NiFiProperties properties = NiFiProperties.createBasicNiFiProperties(null, clusterSettings);
+ final SiteToSiteResource resource = new SiteToSiteResource(properties) {
+ @Override
+ protected void authorizeSiteToSite() {
+ }
+ };
+ resource.setProperties(properties);
+ resource.setServiceFacade(serviceFacade);
+ return resource;
+ }
}
\ No newline at end of file