You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by js...@apache.org on 2018/10/04 20:29:09 UTC
[13/14] nifi git commit: NIFI-5516: Implement Load-Balanced
Connections Refactoring StandardFlowFileQueue to have an
AbstractFlowFileQueue Refactored more into AbstractFlowFileQueue Added
documentation, cleaned up code some Refactored FlowFileQueue so th
http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/SocketProtocolListener.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/SocketProtocolListener.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/SocketProtocolListener.java
index e31a547..9eaffd3 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/SocketProtocolListener.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/SocketProtocolListener.java
@@ -16,15 +16,6 @@
*/
package org.apache.nifi.cluster.protocol.impl;
-import java.io.IOException;
-import java.io.InputStream;
-import java.net.Socket;
-import java.security.cert.CertificateException;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.UUID;
-import java.util.concurrent.CopyOnWriteArrayList;
-
import org.apache.nifi.cluster.protocol.NodeIdentifier;
import org.apache.nifi.cluster.protocol.ProtocolContext;
import org.apache.nifi.cluster.protocol.ProtocolException;
@@ -49,6 +40,22 @@ import org.apache.nifi.util.StopWatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.net.ssl.SSLPeerUnverifiedException;
+import javax.net.ssl.SSLSession;
+import javax.net.ssl.SSLSocket;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.Socket;
+import java.security.cert.Certificate;
+import java.security.cert.CertificateException;
+import java.security.cert.X509Certificate;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.stream.Collectors;
+
/**
* Implements a listener for protocol messages sent over unicast socket.
*
@@ -82,7 +89,6 @@ public class SocketProtocolListener extends SocketListener implements ProtocolLi
@Override
public void start() throws IOException {
-
if (super.isRunning()) {
throw new IllegalStateException("Instance is already started.");
}
@@ -92,7 +98,6 @@ public class SocketProtocolListener extends SocketListener implements ProtocolLi
@Override
public void stop() throws IOException {
-
if (super.isRunning() == false) {
throw new IOException("Instance is already stopped.");
}
@@ -128,8 +133,6 @@ public class SocketProtocolListener extends SocketListener implements ProtocolLi
final String requestId = UUID.randomUUID().toString();
logger.debug("Received request {} from {}", requestId, hostname);
- String requestorDn = getRequestorDN(socket);
-
// unmarshall message
final ProtocolMessageUnmarshaller<ProtocolMessage> unmarshaller = protocolContext.createUnmarshaller();
final ByteCountingInputStream countingIn = new ByteCountingInputStream(socket.getInputStream());
@@ -151,7 +154,7 @@ public class SocketProtocolListener extends SocketListener implements ProtocolLi
}
}
- request.setRequestorDN(requestorDn);
+ final Set<String> nodeIdentities = getCertificateIdentities(socket);
// dispatch message to handler
ProtocolHandler desiredHandler = null;
@@ -168,7 +171,7 @@ public class SocketProtocolListener extends SocketListener implements ProtocolLi
logger.error("Received request of type {} but none of the following Protocol Handlers were able to process the request: {}", request.getType(), handlers);
throw new ProtocolException("No handler assigned to handle message type: " + request.getType());
} else {
- final ProtocolMessage response = desiredHandler.handle(request);
+ final ProtocolMessage response = desiredHandler.handle(request, nodeIdentities);
if (response != null) {
try {
logger.debug("Sending response for request {}", requestId);
@@ -218,11 +221,32 @@ public class SocketProtocolListener extends SocketListener implements ProtocolLi
}
}
- private String getRequestorDN(Socket socket) {
- try {
- return CertificateUtils.extractPeerDNFromSSLSocket(socket);
- } catch (CertificateException e) {
- throw new ProtocolException(e);
+ private Set<String> getCertificateIdentities(final Socket socket) throws IOException {
+ if (socket instanceof SSLSocket) {
+ try {
+ final SSLSession sslSession = ((SSLSocket) socket).getSession();
+ return getCertificateIdentities(sslSession);
+ } catch (CertificateException e) {
+ throw new IOException("Could not extract Subject Alternative Names from client's certificate", e);
+ }
+ } else {
+ return Collections.emptySet();
+ }
+ }
+
+ private Set<String> getCertificateIdentities(final SSLSession sslSession) throws CertificateException, SSLPeerUnverifiedException {
+ final Certificate[] certs = sslSession.getPeerCertificates();
+ if (certs == null || certs.length == 0) {
+ throw new SSLPeerUnverifiedException("No certificates found");
}
+
+ final X509Certificate cert = CertificateUtils.convertAbstractX509Certificate(certs[0]);
+ cert.checkValidity();
+
+ final Set<String> identities = CertificateUtils.getSubjectAlternativeNames(cert).stream()
+ .map(CertificateUtils::extractUsername)
+ .collect(Collectors.toSet());
+
+ return identities;
}
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedNodeIdentifier.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedNodeIdentifier.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedNodeIdentifier.java
index a2d9968..dbc988b 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedNodeIdentifier.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedNodeIdentifier.java
@@ -25,6 +25,8 @@ public class AdaptedNodeIdentifier {
private int apiPort;
private String socketAddress;
private int socketPort;
+ private String loadBalanceAddress;
+ private int loadBalancePort;
private String siteToSiteAddress;
private Integer siteToSitePort;
private Integer siteToSiteHttpApiPort;
@@ -74,6 +76,22 @@ public class AdaptedNodeIdentifier {
this.socketPort = socketPort;
}
+ public String getLoadBalanceAddress() {
+ return loadBalanceAddress;
+ }
+
+ public void setLoadBalanceAddress(final String loadBalanceAddress) {
+ this.loadBalanceAddress = loadBalanceAddress;
+ }
+
+ public int getLoadBalancePort() {
+ return loadBalancePort;
+ }
+
+ public void setLoadBalancePort(final int loadBalancePort) {
+ this.loadBalancePort = loadBalancePort;
+ }
+
public String getSiteToSiteAddress() {
return siteToSiteAddress;
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/NodeIdentifierAdapter.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/NodeIdentifierAdapter.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/NodeIdentifierAdapter.java
index 4a2660f..29aa451 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/NodeIdentifierAdapter.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/NodeIdentifierAdapter.java
@@ -34,6 +34,8 @@ public class NodeIdentifierAdapter extends XmlAdapter<AdaptedNodeIdentifier, Nod
aNi.setApiPort(ni.getApiPort());
aNi.setSocketAddress(ni.getSocketAddress());
aNi.setSocketPort(ni.getSocketPort());
+ aNi.setLoadBalanceAddress(ni.getLoadBalanceAddress());
+ aNi.setLoadBalancePort(ni.getLoadBalancePort());
aNi.setSiteToSiteAddress(ni.getSiteToSiteAddress());
aNi.setSiteToSitePort(ni.getSiteToSitePort());
aNi.setSiteToSiteHttpApiPort(ni.getSiteToSiteHttpApiPort());
@@ -47,7 +49,7 @@ public class NodeIdentifierAdapter extends XmlAdapter<AdaptedNodeIdentifier, Nod
if (aNi == null) {
return null;
} else {
- return new NodeIdentifier(aNi.getId(), aNi.getApiAddress(), aNi.getApiPort(), aNi.getSocketAddress(), aNi.getSocketPort(),
+ return new NodeIdentifier(aNi.getId(), aNi.getApiAddress(), aNi.getApiPort(), aNi.getSocketAddress(), aNi.getSocketPort(), aNi.getLoadBalanceAddress(), aNi.getLoadBalancePort(),
aNi.getSiteToSiteAddress(), aNi.getSiteToSitePort(),aNi.getSiteToSiteHttpApiPort(), aNi.isSiteToSiteSecure());
}
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ProtocolMessage.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ProtocolMessage.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ProtocolMessage.java
index 1cab62f..482f5d6 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ProtocolMessage.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ProtocolMessage.java
@@ -18,8 +18,6 @@ package org.apache.nifi.cluster.protocol.message;
public abstract class ProtocolMessage {
- private volatile String requestorDN;
-
public static enum MessageType {
CONNECTION_REQUEST,
CONNECTION_RESPONSE,
@@ -42,21 +40,4 @@ public abstract class ProtocolMessage {
public abstract MessageType getType();
- /**
- * Sets the DN of the entity making the request
- *
- * @param dn dn of the entity making the request
- */
- public void setRequestorDN(final String dn) {
- this.requestorDN = dn;
- }
-
- /**
- * @return the DN of the entity that made the request, if using a secure
- * socket. Otherwise, returns <code>null</code>
- */
- public String getRequestorDN() {
- return requestorDN;
- }
-
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/resources/nifi-cluster-protocol-context.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/resources/nifi-cluster-protocol-context.xml b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/resources/nifi-cluster-protocol-context.xml
index 63ab689..e2d5bf2 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/resources/nifi-cluster-protocol-context.xml
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/resources/nifi-cluster-protocol-context.xml
@@ -27,17 +27,17 @@
<util:constant static-field="org.apache.nifi.cluster.protocol.jaxb.message.JaxbProtocolUtils.JAXB_CONTEXT"/>
</constructor-arg>
</bean>
-
+
<!-- socket configuration -->
<bean id="protocolSocketConfiguration" class="org.apache.nifi.cluster.protocol.spring.SocketConfigurationFactoryBean">
<property name="properties" ref="nifiProperties"/>
</bean>
-
+
<!-- server socket configuration -->
<bean id="protocolServerSocketConfiguration" class="org.apache.nifi.cluster.protocol.spring.ServerSocketConfigurationFactoryBean">
<property name="properties" ref="nifiProperties"/>
</bean>
-
+
<!-- cluster manager protocol sender -->
<bean id="clusterCoordinationProtocolSender" class="org.apache.nifi.cluster.protocol.impl.StandardClusterCoordinationProtocolSender">
<constructor-arg ref="protocolSocketConfiguration"/>
@@ -49,13 +49,13 @@
<bean factory-bean="nifiProperties" factory-method="getClusterNodeConnectionTimeout"/>
</property>
</bean>
-
+
<!-- cluster manager sender/listener -->
<bean id="clusterCoordinationProtocolSenderListener" class="org.apache.nifi.cluster.protocol.impl.ClusterCoordinationProtocolSenderListener">
<constructor-arg ref="clusterCoordinationProtocolSender"/>
<constructor-arg ref="protocolListener"/>
</bean>
-
+
<!-- node protocol sender -->
<!--
<bean id="nodeProtocolSender" class="org.apache.nifi.cluster.coordination.node.CuratorNodeProtocolSender">
@@ -69,7 +69,7 @@
<constructor-arg ref="protocolContext"/>
<constructor-arg ref="leaderElectionManager"/>
</bean>
-
+
<!-- protocol listener -->
<bean id="protocolListener" class="org.apache.nifi.cluster.protocol.impl.SocketProtocolListener">
<constructor-arg index="0">
@@ -81,7 +81,7 @@
<constructor-arg ref="protocolServerSocketConfiguration" index="2"/>
<constructor-arg ref="protocolContext" index="3"/>
</bean>
-
+
<!-- node sender/listener -->
<bean id="nodeProtocolSenderListener" class="org.apache.nifi.cluster.protocol.impl.NodeProtocolSenderListener">
<constructor-arg ref="nodeProtocolSender"/>
http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/impl/testutils/DelayedProtocolHandler.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/impl/testutils/DelayedProtocolHandler.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/impl/testutils/DelayedProtocolHandler.java
index d6d83ef..aff4b11 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/impl/testutils/DelayedProtocolHandler.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/impl/testutils/DelayedProtocolHandler.java
@@ -16,12 +16,14 @@
*/
package org.apache.nifi.cluster.protocol.impl.testutils;
-import java.util.ArrayList;
-import java.util.List;
import org.apache.nifi.cluster.protocol.ProtocolException;
import org.apache.nifi.cluster.protocol.ProtocolHandler;
import org.apache.nifi.cluster.protocol.message.ProtocolMessage;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+
/**
*/
public class DelayedProtocolHandler implements ProtocolHandler {
@@ -34,7 +36,7 @@ public class DelayedProtocolHandler implements ProtocolHandler {
}
@Override
- public ProtocolMessage handle(ProtocolMessage msg) throws ProtocolException {
+ public ProtocolMessage handle(ProtocolMessage msg, Set<String> nodeIdentities) throws ProtocolException {
try {
messages.add(msg);
Thread.sleep(delay);
http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/impl/testutils/ReflexiveProtocolHandler.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/impl/testutils/ReflexiveProtocolHandler.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/impl/testutils/ReflexiveProtocolHandler.java
index ccf2c4c..05d5b77 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/impl/testutils/ReflexiveProtocolHandler.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/impl/testutils/ReflexiveProtocolHandler.java
@@ -16,12 +16,14 @@
*/
package org.apache.nifi.cluster.protocol.impl.testutils;
-import java.util.ArrayList;
-import java.util.List;
import org.apache.nifi.cluster.protocol.ProtocolException;
import org.apache.nifi.cluster.protocol.ProtocolHandler;
import org.apache.nifi.cluster.protocol.message.ProtocolMessage;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+
/**
*/
public class ReflexiveProtocolHandler implements ProtocolHandler {
@@ -29,7 +31,7 @@ public class ReflexiveProtocolHandler implements ProtocolHandler {
private List<ProtocolMessage> messages = new ArrayList<>();
@Override
- public ProtocolMessage handle(ProtocolMessage msg) throws ProtocolException {
+ public ProtocolMessage handle(ProtocolMessage msg, Set<String> nodeIdentities) throws ProtocolException {
messages.add(msg);
return msg;
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/AbstractHeartbeatMonitor.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/AbstractHeartbeatMonitor.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/AbstractHeartbeatMonitor.java
index 35bf510..4395883 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/AbstractHeartbeatMonitor.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/AbstractHeartbeatMonitor.java
@@ -29,6 +29,7 @@ import org.apache.nifi.util.StopWatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.Collections;
import java.util.Map;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
@@ -198,7 +199,7 @@ public abstract class AbstractHeartbeatMonitor implements HeartbeatMonitor {
final NodeIdentifier nodeId = heartbeat.getNodeIdentifier();
// Do not process heartbeat if it's blocked by firewall.
- if (clusterCoordinator.isBlockedByFirewall(nodeId.getSocketAddress())) {
+ if (clusterCoordinator.isBlockedByFirewall(Collections.singleton(nodeId.getSocketAddress()))) {
clusterCoordinator.reportEvent(nodeId, Severity.WARNING, "Firewall blocked received heartbeat. Issuing disconnection request.");
// request node to disconnect
http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/ClusterProtocolHeartbeatMonitor.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/ClusterProtocolHeartbeatMonitor.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/ClusterProtocolHeartbeatMonitor.java
index 2d6f023..43f3f2b 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/ClusterProtocolHeartbeatMonitor.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/ClusterProtocolHeartbeatMonitor.java
@@ -16,16 +16,6 @@
*/
package org.apache.nifi.cluster.coordination.heartbeat;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.function.Function;
-import java.util.stream.Collectors;
import org.apache.nifi.cluster.coordination.ClusterCoordinator;
import org.apache.nifi.cluster.coordination.node.NodeConnectionState;
import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus;
@@ -46,6 +36,17 @@ import org.apache.nifi.util.NiFiProperties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
/**
* Uses Apache ZooKeeper to advertise the address to send heartbeats to, and
* then relies on the NiFi Cluster Protocol to receive heartbeat messages from
@@ -134,7 +135,7 @@ public class ClusterProtocolHeartbeatMonitor extends AbstractHeartbeatMonitor im
}
@Override
- public ProtocolMessage handle(final ProtocolMessage msg) throws ProtocolException {
+ public ProtocolMessage handle(final ProtocolMessage msg, Set<String> nodeIds) throws ProtocolException {
switch (msg.getType()) {
case HEARTBEAT:
return handleHeartbeat((HeartbeatMessage) msg);
http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java
index 4e4625c..484d155 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java
@@ -16,13 +16,19 @@
*/
package org.apache.nifi.cluster.coordination.node;
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.commons.collections4.queue.CircularFifoQueue;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.cluster.coordination.ClusterCoordinator;
+import org.apache.nifi.cluster.coordination.ClusterTopologyEventListener;
import org.apache.nifi.cluster.coordination.flow.FlowElection;
import org.apache.nifi.cluster.coordination.http.HttpResponseMapper;
import org.apache.nifi.cluster.coordination.http.StandardHttpResponseMapper;
import org.apache.nifi.cluster.coordination.http.replication.RequestCompletionCallback;
+import org.apache.nifi.cluster.coordination.node.state.NodeIdentifierDescriptor;
import org.apache.nifi.cluster.event.Event;
import org.apache.nifi.cluster.event.NodeEvent;
import org.apache.nifi.cluster.exception.NoClusterCoordinatorException;
@@ -49,8 +55,14 @@ import org.apache.nifi.cluster.protocol.message.NodeStatusChangeMessage;
import org.apache.nifi.cluster.protocol.message.ProtocolMessage;
import org.apache.nifi.cluster.protocol.message.ProtocolMessage.MessageType;
import org.apache.nifi.cluster.protocol.message.ReconnectionRequestMessage;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.components.state.StateManager;
+import org.apache.nifi.components.state.StateManagerProvider;
+import org.apache.nifi.components.state.StateMap;
import org.apache.nifi.controller.leader.election.LeaderElectionManager;
+import org.apache.nifi.controller.state.manager.StandardStateManagerProvider;
import org.apache.nifi.events.EventReporter;
+import org.apache.nifi.registry.VariableRegistry;
import org.apache.nifi.reporting.Severity;
import org.apache.nifi.services.FlowService;
import org.apache.nifi.util.NiFiProperties;
@@ -59,6 +71,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
+import java.io.StringWriter;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -69,6 +82,7 @@ import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;
import java.util.regex.Pattern;
@@ -93,6 +107,7 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
private final AtomicLong latestUpdateId = new AtomicLong(-1);
private final FlowElection flowElection;
private final NodeProtocolSender nodeProtocolSender;
+ private final StateManager stateManager;
private volatile FlowService flowService;
private volatile boolean connected;
@@ -102,9 +117,18 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
private final ConcurrentMap<NodeIdentifier, NodeConnectionStatus> nodeStatuses = new ConcurrentHashMap<>();
private final ConcurrentMap<NodeIdentifier, CircularFifoQueue<NodeEvent>> nodeEvents = new ConcurrentHashMap<>();
+ private final List<ClusterTopologyEventListener> eventListeners = new CopyOnWriteArrayList<>();
+
+ public NodeClusterCoordinator(final ClusterCoordinationProtocolSenderListener senderListener, final EventReporter eventReporter, final LeaderElectionManager leaderElectionManager,
+ final FlowElection flowElection, final ClusterNodeFirewall firewall, final RevisionManager revisionManager, final NiFiProperties nifiProperties,
+ final NodeProtocolSender nodeProtocolSender) throws IOException {
+ this(senderListener, eventReporter, leaderElectionManager, flowElection, firewall, revisionManager, nifiProperties, nodeProtocolSender,
+ StandardStateManagerProvider.create(nifiProperties, VariableRegistry.EMPTY_REGISTRY));
+ }
+
public NodeClusterCoordinator(final ClusterCoordinationProtocolSenderListener senderListener, final EventReporter eventReporter, final LeaderElectionManager leaderElectionManager,
final FlowElection flowElection, final ClusterNodeFirewall firewall, final RevisionManager revisionManager, final NiFiProperties nifiProperties,
- final NodeProtocolSender nodeProtocolSender) {
+ final NodeProtocolSender nodeProtocolSender, final StateManagerProvider stateManagerProvider) throws IOException {
this.senderListener = senderListener;
this.flowService = null;
this.eventReporter = eventReporter;
@@ -114,10 +138,98 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
this.leaderElectionManager = leaderElectionManager;
this.flowElection = flowElection;
this.nodeProtocolSender = nodeProtocolSender;
+ this.stateManager = stateManagerProvider.getStateManager("Cluster Coordinator");
+
+ recoverState();
senderListener.addHandler(this);
}
+ private void recoverState() throws IOException {
+ final StateMap stateMap = stateManager.getState(Scope.LOCAL);
+ if (stateMap == null) {
+ logger.debug("No state to restore");
+ return;
+ }
+
+ final ObjectMapper mapper = new ObjectMapper();
+ final JsonFactory jsonFactory = new JsonFactory();
+ jsonFactory.setCodec(mapper);
+
+ final Map<NodeIdentifier, NodeConnectionStatus> connectionStatusMap = new HashMap<>();
+ NodeIdentifier localNodeId = null;
+
+ final Map<String, String> state = stateMap.toMap();
+ for (final Map.Entry<String, String> entry : state.entrySet()) {
+ final String nodeUuid = entry.getKey();
+ final String nodeIdentifierJson = entry.getValue();
+ logger.debug("Recovering state for {} = {}", nodeUuid, nodeIdentifierJson);
+
+ try (final JsonParser jsonParser = jsonFactory.createParser(nodeIdentifierJson)) {
+ final NodeIdentifierDescriptor nodeIdDesc = jsonParser.readValueAs(NodeIdentifierDescriptor.class);
+ final NodeIdentifier nodeId = nodeIdDesc.toNodeIdentifier();
+
+ connectionStatusMap.put(nodeId, new NodeConnectionStatus(nodeId, DisconnectionCode.NOT_YET_CONNECTED));
+ if (nodeIdDesc.isLocalNodeIdentifier()) {
+ if (localNodeId == null) {
+ localNodeId = nodeId;
+ } else {
+ logger.warn("When recovering state, determined that tgwo Node Identifiers claim to be the local Node Identifier: {} and {}. Will ignore both of these and wait until " +
+ "connecting to cluster to determine which Node Identiifer is the local Node Identifier", localNodeId, nodeId);
+ localNodeId = null;
+ }
+ }
+ }
+ }
+
+ if (!connectionStatusMap.isEmpty()) {
+ resetNodeStatuses(connectionStatusMap);
+ }
+
+ if (localNodeId != null) {
+ logger.debug("Recovered state indicating that Local Node Identifier is {}", localNodeId);
+ setLocalNodeIdentifier(localNodeId);
+ }
+ }
+
+ private void storeState() {
+ final ObjectMapper mapper = new ObjectMapper();
+ final JsonFactory jsonFactory = new JsonFactory();
+ jsonFactory.setCodec(mapper);
+
+ try {
+ final Map<String, String> stateMap = new HashMap<>();
+
+ final NodeIdentifier localNodeId = getLocalNodeIdentifier();
+ for (final NodeIdentifier nodeId : getNodeIdentifiers()) {
+ final boolean isLocalId = nodeId.equals(localNodeId);
+ final NodeIdentifierDescriptor descriptor = NodeIdentifierDescriptor.fromNodeIdentifier(nodeId, isLocalId);
+
+ try (final StringWriter writer = new StringWriter()) {
+ final JsonGenerator jsonGenerator = jsonFactory.createGenerator(writer);
+ jsonGenerator.writeObject(descriptor);
+
+ final String serializedDescriptor = writer.toString();
+ stateMap.put(nodeId.getId(), serializedDescriptor);
+ }
+ }
+
+ stateManager.setState(stateMap, Scope.LOCAL);
+ logger.debug("Stored the following state as the Cluster Topology: {}", stateMap);
+ } catch (final Exception e) {
+ logger.warn("Failed to store cluster topology to local State Manager. Upon restart of NiFi, the cluster topology may not be accurate until joining the cluster.", e);
+ }
+ }
+
+
+ public void registerEventListener(final ClusterTopologyEventListener eventListener) {
+ this.eventListeners.add(eventListener);
+ }
+
+ public void unregisterEventListener(final ClusterTopologyEventListener eventListener) {
+ this.eventListeners.remove(eventListener);
+ }
+
@Override
public void shutdown() {
if (closed) {
@@ -136,8 +248,13 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
@Override
public void setLocalNodeIdentifier(final NodeIdentifier nodeId) {
+ if (nodeId == null || nodeId.equals(this.nodeId)) {
+ return;
+ }
+
this.nodeId = nodeId;
nodeStatuses.computeIfAbsent(nodeId, id -> new NodeConnectionStatus(id, DisconnectionCode.NOT_YET_CONNECTED));
+ eventListeners.forEach(listener -> listener.onLocalNodeIdentifierSet(nodeId));
}
@Override
@@ -170,7 +287,7 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
return localNodeId;
}
- private String getElectedActiveCoordinatorAddress() throws IOException {
+ private String getElectedActiveCoordinatorAddress() {
return leaderElectionManager.getLeader(ClusterRoles.CLUSTER_COORDINATOR);
}
@@ -185,11 +302,62 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
final NodeConnectionStatus proposedStatus = entry.getValue();
if (proposedStatus.getState() == NodeConnectionState.REMOVED) {
- nodeStatuses.remove(nodeId);
+ removeNode(nodeId);
} else {
- nodeStatuses.put(nodeId, proposedStatus);
+ updateNodeStatus(nodeId, proposedStatus, false);
}
}
+
+ storeState();
+ }
+
+ private NodeConnectionStatus removeNode(final NodeIdentifier nodeId) {
+ final NodeConnectionStatus status = nodeStatuses.remove(nodeId);
+ nodeEvents.remove(nodeId);
+ if (status != null) {
+ onNodeRemoved(nodeId);
+ }
+
+ return status;
+ }
+
+ private boolean removeNodeConditionally(final NodeIdentifier nodeId, final NodeConnectionStatus expectedStatus) {
+ final boolean removed = nodeStatuses.remove(nodeId, expectedStatus);
+ if (removed) {
+ nodeEvents.remove(nodeId);
+ onNodeRemoved(nodeId);
+ }
+
+ return removed;
+ }
+
+ private NodeConnectionStatus updateNodeStatus(final NodeIdentifier nodeId, final NodeConnectionStatus updatedStatus) {
+ return updateNodeStatus(nodeId, updatedStatus, true);
+ }
+
+ private NodeConnectionStatus updateNodeStatus(final NodeIdentifier nodeId, final NodeConnectionStatus updatedStatus, final boolean storeState) {
+ final NodeConnectionStatus evictedStatus = nodeStatuses.put(nodeId, updatedStatus);
+ if (evictedStatus == null) {
+ onNodeAdded(nodeId, storeState);
+ }
+
+ return evictedStatus;
+ }
+
+ private boolean updateNodeStatusConditionally(final NodeIdentifier nodeId, final NodeConnectionStatus expectedStatus, final NodeConnectionStatus updatedStatus) {
+ final boolean updated;
+ if (expectedStatus == null) {
+ final NodeConnectionStatus existingValue = nodeStatuses.putIfAbsent(nodeId, updatedStatus);
+ updated = existingValue == null;
+
+ if (updated) {
+ onNodeAdded(nodeId, true);
+ }
+ } else {
+ updated = nodeStatuses.replace(nodeId, expectedStatus, updatedStatus);
+ }
+
+ return updated;
}
@Override
@@ -228,17 +396,21 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
if (currentStatus == null) {
if (newStatus.getState() == NodeConnectionState.REMOVED) {
- return nodeStatuses.remove(nodeId, currentStatus);
+ return removeNodeConditionally(nodeId, currentStatus);
} else {
- final NodeConnectionStatus existingValue = nodeStatuses.putIfAbsent(nodeId, newStatus);
- return existingValue == null;
+ return updateNodeStatusConditionally(nodeId, null, newStatus);
}
}
if (newStatus.getState() == NodeConnectionState.REMOVED) {
- return nodeStatuses.remove(nodeId, currentStatus);
+ if (removeNodeConditionally(nodeId, currentStatus)) {
+ storeState();
+ return true;
+ } else {
+ return false;
+ }
} else {
- return nodeStatuses.replace(nodeId, currentStatus, newStatus);
+ return updateNodeStatusConditionally(nodeId, currentStatus, newStatus);
}
}
@@ -348,9 +520,23 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
@Override
public void removeNode(final NodeIdentifier nodeId, final String userDn) {
reportEvent(nodeId, Severity.INFO, "User " + userDn + " requested that node be removed from cluster");
- nodeStatuses.remove(nodeId);
- nodeEvents.remove(nodeId);
notifyOthersOfNodeStatusChange(new NodeConnectionStatus(nodeId, NodeConnectionState.REMOVED));
+ removeNode(nodeId);
+
+ storeState();
+ }
+
+ private void onNodeRemoved(final NodeIdentifier nodeId) {
+ eventListeners.stream().forEach(listener -> listener.onNodeRemoved(nodeId));
+ }
+
+ private void onNodeAdded(final NodeIdentifier nodeId, final boolean storeState) {
+ if (storeState) {
+ storeState();
+ }
+
+
+ eventListeners.stream().forEach(listener -> listener.onNodeAdded(nodeId));
}
@Override
@@ -381,8 +567,18 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
}
@Override
- public boolean isBlockedByFirewall(final String hostname) {
- return firewall != null && !firewall.isPermissible(hostname);
+ public boolean isBlockedByFirewall(final Set<String> nodeIdentities) {
+ if (firewall == null) {
+ return false;
+ }
+
+ for (final String nodeId : nodeIdentities) {
+ if (firewall.isPermissible(nodeId)) {
+ return false;
+ }
+ }
+
+ return true;
}
@Override
@@ -455,28 +651,21 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
}
private NodeIdentifier getElectedActiveCoordinatorNode(final boolean warnOnError) {
- final String electedNodeAddress;
+ String electedNodeAddress;
try {
electedNodeAddress = getElectedActiveCoordinatorAddress();
} catch (final NoClusterCoordinatorException ncce) {
logger.debug("There is currently no elected active Cluster Coordinator");
return null;
- } catch (final IOException ioe) {
- if (warnOnError) {
- logger.warn("Failed to determine which node is elected active Cluster Coordinator. There may be no coordinator currently: " + ioe);
- if (logger.isDebugEnabled()) {
- logger.warn("", ioe);
- }
- }
-
- return null;
}
- if (electedNodeAddress == null) {
+ if (electedNodeAddress == null || electedNodeAddress.trim().isEmpty()) {
logger.debug("There is currently no elected active Cluster Coordinator");
return null;
}
+ electedNodeAddress = electedNodeAddress.trim();
+
final int colonLoc = electedNodeAddress.indexOf(':');
if (colonLoc < 1) {
if (warnOnError) {
@@ -519,6 +708,7 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
final NodeConnectionStatus existingStatus = this.nodeStatuses.putIfAbsent(connectionStatus.getNodeIdentifier(), connectionStatus);
if (existingStatus == null) {
+ onNodeAdded(connectionStatus.getNodeIdentifier(), true);
return connectionStatus.getNodeIdentifier();
} else {
return existingStatus.getNodeIdentifier();
@@ -594,7 +784,7 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
// this method is called when something occurs that causes this node to change the status of the
// node in question. We only use comparisons against the current value when we receive an update
// about a node status from a different node, since those may be received out-of-order.
- final NodeConnectionStatus currentStatus = nodeStatuses.put(nodeId, status);
+ final NodeConnectionStatus currentStatus = updateNodeStatus(nodeId, status);
final NodeConnectionState currentState = currentStatus == null ? null : currentStatus.getState();
logger.info("Status of {} changed from {} to {}", nodeId, currentStatus, status);
logger.debug("State of cluster nodes is now {}", nodeStatuses);
@@ -741,10 +931,10 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
}
@Override
- public ProtocolMessage handle(final ProtocolMessage protocolMessage) throws ProtocolException {
+ public ProtocolMessage handle(final ProtocolMessage protocolMessage, final Set<String> nodeIdentities) throws ProtocolException {
switch (protocolMessage.getType()) {
case CONNECTION_REQUEST:
- return handleConnectionRequest((ConnectionRequestMessage) protocolMessage);
+ return handleConnectionRequest((ConnectionRequestMessage) protocolMessage, nodeIdentities);
case NODE_STATUS_CHANGE:
handleNodeStatusChange((NodeStatusChangeMessage) protocolMessage);
return null;
@@ -790,9 +980,11 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
// Either remove the value from the map or update the map depending on the connection state
if (statusChangeMessage.getNodeConnectionStatus().getState() == NodeConnectionState.REMOVED) {
- nodeStatuses.remove(nodeId, oldStatus);
+ if (removeNodeConditionally(nodeId, oldStatus)) {
+ storeState();
+ }
} else {
- nodeStatuses.put(nodeId, updatedStatus);
+ updateNodeStatus(nodeId, updatedStatus);
}
logger.info("Status of {} changed from {} to {}", statusChangeMessage.getNodeId(), oldStatus, updatedStatus);
@@ -838,6 +1030,7 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
// there is no node with that ID
resolvedNodeId = proposedIdentifier;
logger.debug("No existing node with ID {}; resolved node ID is as-proposed", proposedIdentifier.getId());
+ onNodeAdded(resolvedNodeId, true);
} else if (existingStatus.getNodeIdentifier().logicallyEquals(proposedIdentifier)) {
// there is a node with that ID but it's the same node.
resolvedNodeId = proposedIdentifier;
@@ -854,28 +1047,37 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
return resolvedNodeId;
}
- private ConnectionResponseMessage handleConnectionRequest(final ConnectionRequestMessage requestMessage) {
+ private ConnectionResponseMessage handleConnectionRequest(final ConnectionRequestMessage requestMessage, final Set<String> nodeIdentities) {
final NodeIdentifier proposedIdentifier = requestMessage.getConnectionRequest().getProposedNodeIdentifier();
- final NodeIdentifier withRequestorDn = addRequestorDn(proposedIdentifier, requestMessage.getRequestorDN());
+ final NodeIdentifier withNodeIdentities = addNodeIdentities(proposedIdentifier, nodeIdentities);
final DataFlow dataFlow = requestMessage.getConnectionRequest().getDataFlow();
- final ConnectionRequest requestWithDn = new ConnectionRequest(withRequestorDn, dataFlow);
+ final ConnectionRequest requestWithNodeIdentities = new ConnectionRequest(withNodeIdentities, dataFlow);
// Resolve Node identifier.
final NodeIdentifier resolvedNodeId = resolveNodeId(proposedIdentifier);
+ if (isBlockedByFirewall(nodeIdentities)) {
+ // if the socket address is not listed in the firewall, then return a null response
+ logger.info("Firewall blocked connection request from node " + resolvedNodeId + " with Node Identities " + nodeIdentities);
+ final ConnectionResponse response = ConnectionResponse.createBlockedByFirewallResponse();
+ final ConnectionResponseMessage responseMessage = new ConnectionResponseMessage();
+ responseMessage.setConnectionResponse(response);
+ return responseMessage;
+ }
+
if (requireElection) {
- final DataFlow electedDataFlow = flowElection.castVote(dataFlow, withRequestorDn);
+ final DataFlow electedDataFlow = flowElection.castVote(dataFlow, withNodeIdentities);
if (electedDataFlow == null) {
- logger.info("Received Connection Request from {}; responding with Flow Election In Progress message", withRequestorDn);
+ logger.info("Received Connection Request from {}; responding with Flow Election In Progress message", withNodeIdentities);
return createFlowElectionInProgressResponse();
} else {
- logger.info("Received Connection Request from {}; responding with DataFlow that was elected", withRequestorDn);
- return createConnectionResponse(requestWithDn, resolvedNodeId, electedDataFlow);
+ logger.info("Received Connection Request from {}; responding with DataFlow that was elected", withNodeIdentities);
+ return createConnectionResponse(requestWithNodeIdentities, resolvedNodeId, electedDataFlow);
}
}
- logger.info("Received Connection Request from {}; responding with my DataFlow", withRequestorDn);
- return createConnectionResponse(requestWithDn, resolvedNodeId);
+ logger.info("Received Connection Request from {}; responding with my DataFlow", withNodeIdentities);
+ return createConnectionResponse(requestWithNodeIdentities, resolvedNodeId);
}
private ConnectionResponseMessage createFlowElectionInProgressResponse() {
@@ -901,15 +1103,6 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
private ConnectionResponseMessage createConnectionResponse(final ConnectionRequest request, final NodeIdentifier resolvedNodeIdentifier, final DataFlow clusterDataFlow) {
- if (isBlockedByFirewall(resolvedNodeIdentifier.getSocketAddress())) {
- // if the socket address is not listed in the firewall, then return a null response
- logger.info("Firewall blocked connection request from node " + resolvedNodeIdentifier);
- final ConnectionResponse response = ConnectionResponse.createBlockedByFirewallResponse();
- final ConnectionResponseMessage responseMessage = new ConnectionResponseMessage();
- responseMessage.setConnectionResponse(response);
- return responseMessage;
- }
-
if (clusterDataFlow == null) {
final ConnectionResponseMessage responseMessage = new ConnectionResponseMessage();
responseMessage.setConnectionResponse(new ConnectionResponse(5, "The cluster dataflow is not yet available"));
@@ -936,11 +1129,12 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
}
- private NodeIdentifier addRequestorDn(final NodeIdentifier nodeId, final String dn) {
+ private NodeIdentifier addNodeIdentities(final NodeIdentifier nodeId, final Set<String> nodeIdentities) {
return new NodeIdentifier(nodeId.getId(), nodeId.getApiAddress(), nodeId.getApiPort(),
nodeId.getSocketAddress(), nodeId.getSocketPort(),
+ nodeId.getLoadBalanceAddress(), nodeId.getLoadBalancePort(),
nodeId.getSiteToSiteAddress(), nodeId.getSiteToSitePort(),
- nodeId.getSiteToSiteHttpApiPort(), nodeId.isSiteToSiteSecure(), dn);
+ nodeId.getSiteToSiteHttpApiPort(), nodeId.isSiteToSiteSecure(), nodeIdentities);
}
@Override
http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/state/NodeIdentifierDescriptor.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/state/NodeIdentifierDescriptor.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/state/NodeIdentifierDescriptor.java
new file mode 100644
index 0000000..eeabbe3
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/state/NodeIdentifierDescriptor.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.cluster.coordination.node.state;
+
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+public class NodeIdentifierDescriptor {
+ private String id;
+ private String apiAddress;
+ private int apiPort;
+ private String socketAddress;
+ private int socketPort;
+ private String loadBalanceAddress;
+ private int loadBalancePort;
+ private String siteToSiteAddress;
+ private Integer siteToSitePort;
+ private Integer siteToSiteHttpApiPort;
+ private Boolean siteToSiteSecure;
+ private Set<String> nodeIdentities;
+ private boolean localNodeIdentifier;
+
+ public String getId() {
+ return id;
+ }
+
+ public void setId(final String id) {
+ this.id = id;
+ }
+
+ public String getApiAddress() {
+ return apiAddress;
+ }
+
+ public void setApiAddress(final String apiAddress) {
+ this.apiAddress = apiAddress;
+ }
+
+ public int getApiPort() {
+ return apiPort;
+ }
+
+ public void setApiPort(final int apiPort) {
+ this.apiPort = apiPort;
+ }
+
+ public String getSocketAddress() {
+ return socketAddress;
+ }
+
+ public void setSocketAddress(final String socketAddress) {
+ this.socketAddress = socketAddress;
+ }
+
+ public int getSocketPort() {
+ return socketPort;
+ }
+
+ public void setSocketPort(final int socketPort) {
+ this.socketPort = socketPort;
+ }
+
+ public String getLoadBalanceAddress() {
+ return loadBalanceAddress;
+ }
+
+ public void setLoadBalanceAddress(final String loadBalanceAddress) {
+ this.loadBalanceAddress = loadBalanceAddress;
+ }
+
+ public int getLoadBalancePort() {
+ return loadBalancePort;
+ }
+
+ public void setLoadBalancePort(final int loadBalancePort) {
+ this.loadBalancePort = loadBalancePort;
+ }
+
+ public String getSiteToSiteAddress() {
+ return siteToSiteAddress;
+ }
+
+ public void setSiteToSiteAddress(final String siteToSiteAddress) {
+ this.siteToSiteAddress = siteToSiteAddress;
+ }
+
+ public Integer getSiteToSitePort() {
+ return siteToSitePort;
+ }
+
+ public void setSiteToSitePort(final Integer siteToSitePort) {
+ this.siteToSitePort = siteToSitePort;
+ }
+
+ public Integer getSiteToSiteHttpApiPort() {
+ return siteToSiteHttpApiPort;
+ }
+
+ public void setSiteToSiteHttpApiPort(final Integer siteToSiteHttpApiPort) {
+ this.siteToSiteHttpApiPort = siteToSiteHttpApiPort;
+ }
+
+ public Boolean getSiteToSiteSecure() {
+ return siteToSiteSecure;
+ }
+
+ public void setSiteToSiteSecure(final Boolean siteToSiteSecure) {
+ this.siteToSiteSecure = siteToSiteSecure;
+ }
+
+ public Set<String> getNodeIdentities() {
+ return nodeIdentities;
+ }
+
+ public void setNodeIdentities(final Set<String> nodeIdentities) {
+ this.nodeIdentities = Collections.unmodifiableSet(new HashSet<>(nodeIdentities));
+ }
+
+ public boolean isLocalNodeIdentifier() {
+ return localNodeIdentifier;
+ }
+
+ public void setLocalNodeIdentifier(final boolean localNodeIdentifier) {
+ this.localNodeIdentifier = localNodeIdentifier;
+ }
+
+ public static NodeIdentifierDescriptor fromNodeIdentifier(final NodeIdentifier nodeId, final boolean localNodeId) {
+ final NodeIdentifierDescriptor descriptor = new NodeIdentifierDescriptor();
+ descriptor.setId(nodeId.getId());
+ descriptor.setApiAddress(nodeId.getApiAddress());
+ descriptor.setApiPort(nodeId.getApiPort());
+ descriptor.setSocketAddress(nodeId.getSocketAddress());
+ descriptor.setSocketPort(nodeId.getSocketPort());
+ descriptor.setSiteToSiteAddress(nodeId.getSiteToSiteAddress());
+ descriptor.setSiteToSitePort(nodeId.getSiteToSitePort());
+ descriptor.setSiteToSiteHttpApiPort(nodeId.getSiteToSiteHttpApiPort());
+ descriptor.setSiteToSiteSecure(nodeId.isSiteToSiteSecure());
+ descriptor.setNodeIdentities(nodeId.getNodeIdentities());
+ descriptor.setLoadBalanceAddress(nodeId.getLoadBalanceAddress());
+ descriptor.setLoadBalancePort(nodeId.getLoadBalancePort());
+ descriptor.setLocalNodeIdentifier(localNodeId);
+ return descriptor;
+ }
+
+ public NodeIdentifier toNodeIdentifier() {
+ return new NodeIdentifier(getId(), getApiAddress(), getApiPort(), getSocketAddress(), getSocketPort(), getLoadBalanceAddress(), getLoadBalancePort(),
+ getSiteToSiteAddress(), getSiteToSitePort(), getSiteToSiteHttpApiPort(), getSiteToSiteSecure(), getNodeIdentities());
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ConnectionEntityMerger.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ConnectionEntityMerger.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ConnectionEntityMerger.java
index 7e3bc5d..a25fb4b 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ConnectionEntityMerger.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ConnectionEntityMerger.java
@@ -17,10 +17,12 @@
package org.apache.nifi.cluster.manager;
import org.apache.nifi.cluster.protocol.NodeIdentifier;
+import org.apache.nifi.web.api.dto.ConnectionDTO;
import org.apache.nifi.web.api.dto.status.ConnectionStatusDTO;
import org.apache.nifi.web.api.entity.ConnectionEntity;
import java.util.Map;
+import java.util.Objects;
public class ConnectionEntityMerger implements ComponentEntityMerger<ConnectionEntity>, ComponentEntityStatusMerger<ConnectionStatusDTO> {
@@ -33,6 +35,22 @@ public class ConnectionEntityMerger implements ComponentEntityMerger<ConnectionE
mergeStatus(clientEntity.getStatus(), clientEntity.getPermissions().getCanRead(), entry.getValue().getStatus(), entry.getValue().getPermissions().getCanRead(), entry.getKey());
}
}
+
+ // If Load Balancing is configured but client entity indicates that data is not being transferred, we need to check if any other
+ // node is actively transferring data. If Client Entity is transferring data, we already know the correct value for the Status,
+ // and if the Connection is not configured for Load Balancing, then we also know the correct value, so no need to look at all of
+ // the values of the other nodes.
+ if (clientEntity.getComponent() != null && ConnectionDTO.LOAD_BALANCE_INACTIVE.equals(clientEntity.getComponent().getLoadBalanceStatus())) {
+ final boolean anyActive = entityMap.values().stream()
+ .map(ConnectionEntity::getComponent)
+ .filter(Objects::nonNull)
+ .map(ConnectionDTO::getLoadBalanceStatus)
+ .anyMatch(status -> status.equals(ConnectionDTO.LOAD_BALANCE_ACTIVE));
+
+ if (anyActive) {
+ clientEntity.getComponent().setLoadBalanceStatus(ConnectionDTO.LOAD_BALANCE_ACTIVE);
+ }
+ }
}
@Override
http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ProcessorDiagnosticsEntityMerger.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ProcessorDiagnosticsEntityMerger.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ProcessorDiagnosticsEntityMerger.java
index 1f163f4..826ecf7 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ProcessorDiagnosticsEntityMerger.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ProcessorDiagnosticsEntityMerger.java
@@ -17,22 +17,27 @@
package org.apache.nifi.cluster.manager;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.function.Function;
-import java.util.stream.Collectors;
-
import org.apache.nifi.cluster.protocol.NodeIdentifier;
+import org.apache.nifi.web.api.dto.diagnostics.ConnectionDiagnosticsDTO;
+import org.apache.nifi.web.api.dto.diagnostics.ConnectionDiagnosticsSnapshotDTO;
import org.apache.nifi.web.api.dto.diagnostics.ControllerServiceDiagnosticsDTO;
import org.apache.nifi.web.api.dto.diagnostics.JVMDiagnosticsSnapshotDTO;
+import org.apache.nifi.web.api.dto.diagnostics.LocalQueuePartitionDTO;
import org.apache.nifi.web.api.dto.diagnostics.NodeJVMDiagnosticsSnapshotDTO;
import org.apache.nifi.web.api.dto.diagnostics.ProcessorDiagnosticsDTO;
+import org.apache.nifi.web.api.dto.diagnostics.RemoteQueuePartitionDTO;
import org.apache.nifi.web.api.dto.diagnostics.ThreadDumpDTO;
import org.apache.nifi.web.api.entity.ControllerServiceEntity;
import org.apache.nifi.web.api.entity.ProcessorDiagnosticsEntity;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
public class ProcessorDiagnosticsEntityMerger implements ComponentEntityMerger<ProcessorDiagnosticsEntity> {
private final long componentStatusSnapshotMillis;
@@ -46,6 +51,11 @@ public class ProcessorDiagnosticsEntityMerger implements ComponentEntityMerger<P
final List<NodeJVMDiagnosticsSnapshotDTO> nodeJvmDiagnosticsSnapshots = new ArrayList<>(entityMap.size());
+ // Merge connection diagnostics
+ mergeConnectionDiagnostics(clientEntity, entityMap, entity -> entity.getComponent().getIncomingConnections());
+ mergeConnectionDiagnostics(clientEntity, entityMap, entity -> entity.getComponent().getOutgoingConnections());
+
+
// Merge the Processor Statuses and create a separate NodeJVMDiagnosticsSnapshotDTO for each. We do both of these
// together simply because we are already iterating over the entityMap and we have to create the Node-specific JVM diagnostics
// before we start merging the values, in the second iteration over the map.
@@ -99,7 +109,7 @@ public class ProcessorDiagnosticsEntityMerger implements ComponentEntityMerger<P
// Merge permissions on referenced controller services
final Map<String, ControllerServiceEntity> serviceEntityById = clientDto.getReferencedControllerServices().stream()
- .map(diagnosticsDto -> diagnosticsDto.getControllerService())
+ .map(ControllerServiceDiagnosticsDTO::getControllerService)
.collect(Collectors.toMap(ControllerServiceEntity::getId, Function.identity()));
for (final Map.Entry<NodeIdentifier, ProcessorDiagnosticsEntity> entry : entityMap.entrySet()) {
@@ -114,6 +124,129 @@ public class ProcessorDiagnosticsEntityMerger implements ComponentEntityMerger<P
}
}
}
+ }
+
+ private void mergeConnectionDiagnostics(final ProcessorDiagnosticsEntity clientEntity, final Map<NodeIdentifier, ProcessorDiagnosticsEntity> entityMap,
+ final Function<ProcessorDiagnosticsEntity, Set<ConnectionDiagnosticsDTO>> extractConnections) {
+
+ final Map<String, List<ConnectionDiagnosticsSnapshotDTO>> snapshotByConnectionId = new HashMap<>();
+ final Map<String, ConnectionDiagnosticsDTO> connectionById = new HashMap<>();
+
+ for (final Map.Entry<NodeIdentifier, ProcessorDiagnosticsEntity> entry : entityMap.entrySet()) {
+ final NodeIdentifier nodeId = entry.getKey();
+ final ProcessorDiagnosticsEntity entity = entry.getValue();
+
+ final Set<ConnectionDiagnosticsDTO> connections = extractConnections.apply(entity);
+ for (final ConnectionDiagnosticsDTO connectionDiagnostics : connections) {
+ final String connectionId = connectionDiagnostics.getConnection().getId();
+ final ConnectionDiagnosticsSnapshotDTO snapshot = connectionDiagnostics.getAggregateSnapshot();
+
+ snapshot.setNodeIdentifier(nodeId.getApiAddress() + ":" + nodeId.getApiPort());
+
+ final List<ConnectionDiagnosticsSnapshotDTO> snapshots = snapshotByConnectionId.computeIfAbsent(connectionId, id -> new ArrayList<>());
+ snapshots.add(snapshot);
+
+ if (entity == clientEntity){
+ connectionById.put(connectionId, connectionDiagnostics);
+ }
+ }
+ }
+
+ for (final Map.Entry<String, List<ConnectionDiagnosticsSnapshotDTO>> entry : snapshotByConnectionId.entrySet()) {
+ final String connectionId = entry.getKey();
+ final List<ConnectionDiagnosticsSnapshotDTO> snapshots = entry.getValue();
+
+ final ConnectionDiagnosticsDTO dto = connectionById.get(connectionId);
+ dto.setNodeSnapshots(snapshots);
+
+ dto.setAggregateSnapshot(mergeConnectionSnapshots(snapshots));
+ }
+ }
+
+
+
+ private ConnectionDiagnosticsSnapshotDTO mergeConnectionSnapshots(final List<ConnectionDiagnosticsSnapshotDTO> snapshots) {
+ final ConnectionDiagnosticsSnapshotDTO aggregate = new ConnectionDiagnosticsSnapshotDTO();
+
+ final Map<String, List<RemoteQueuePartitionDTO>> remotePartitionsByNodeId = new HashMap<>();
+
+ final LocalQueuePartitionDTO localPartition = new LocalQueuePartitionDTO();
+ localPartition.setActiveQueueByteCount(0);
+ localPartition.setActiveQueueFlowFileCount(0);
+ localPartition.setAllActiveQueueFlowFilesPenalized(true); // set to true because we will update this value by AND'ing it with the snapshot value
+ localPartition.setAnyActiveQueueFlowFilesPenalized(false); // set to false because we will update this value by OR'ing it with the snapshot value
+ localPartition.setInFlightByteCount(0);
+ localPartition.setInFlightFlowFileCount(0);
+ localPartition.setSwapByteCount(0);
+ localPartition.setSwapFiles(0);
+ localPartition.setSwapFlowFileCount(0);
+ localPartition.setTotalByteCount(0);
+ localPartition.setTotalFlowFileCount(0);
+
+ aggregate.setTotalByteCount(0L);
+ aggregate.setTotalFlowFileCount(0);
+ aggregate.setLocalQueuePartition(localPartition);
+
+ for (final ConnectionDiagnosticsSnapshotDTO snapshot : snapshots) {
+ aggregate.setTotalByteCount(aggregate.getTotalByteCount() + snapshot.getTotalByteCount());
+ aggregate.setTotalFlowFileCount(aggregate.getTotalFlowFileCount() + snapshot.getTotalFlowFileCount());
+
+ final LocalQueuePartitionDTO snapshotLocalPartition = snapshot.getLocalQueuePartition();
+ localPartition.setActiveQueueByteCount(localPartition.getActiveQueueByteCount() + snapshotLocalPartition.getActiveQueueByteCount());
+ localPartition.setActiveQueueFlowFileCount(localPartition.getActiveQueueFlowFileCount() + snapshotLocalPartition.getActiveQueueFlowFileCount());
+ localPartition.setAllActiveQueueFlowFilesPenalized(localPartition.getAllActiveQueueFlowFilesPenalized() && snapshotLocalPartition.getAllActiveQueueFlowFilesPenalized());
+ localPartition.setAnyActiveQueueFlowFilesPenalized(localPartition.getAnyActiveQueueFlowFilesPenalized() || snapshotLocalPartition.getAnyActiveQueueFlowFilesPenalized());
+ localPartition.setInFlightByteCount(localPartition.getInFlightByteCount() + snapshotLocalPartition.getInFlightByteCount());
+ localPartition.setInFlightFlowFileCount(localPartition.getInFlightFlowFileCount() + snapshotLocalPartition.getInFlightFlowFileCount());
+ localPartition.setSwapByteCount(localPartition.getSwapByteCount() + snapshotLocalPartition.getSwapByteCount());
+ localPartition.setSwapFiles(localPartition.getSwapFiles() + snapshotLocalPartition.getSwapFiles());
+ localPartition.setSwapFlowFileCount(localPartition.getSwapFlowFileCount() + snapshotLocalPartition.getSwapFlowFileCount());
+ localPartition.setTotalByteCount(localPartition.getTotalByteCount() + snapshotLocalPartition.getTotalByteCount());
+ localPartition.setTotalFlowFileCount(localPartition.getTotalFlowFileCount() + snapshotLocalPartition.getTotalFlowFileCount());
+
+ for (final RemoteQueuePartitionDTO remoteQueuePartition : snapshot.getRemoteQueuePartitions()) {
+ final String nodeId = remoteQueuePartition.getNodeIdentifier();
+ final List<RemoteQueuePartitionDTO> partitionsForNodeId = remotePartitionsByNodeId.computeIfAbsent(nodeId, key -> new ArrayList<>());
+ partitionsForNodeId.add(remoteQueuePartition);
+ }
+ }
+
+ final List<RemoteQueuePartitionDTO> mergedRemoteQueuePartitions = new ArrayList<>();
+ for (final List<RemoteQueuePartitionDTO> partitions : remotePartitionsByNodeId.values()) {
+ final RemoteQueuePartitionDTO merged = mergeRemoteQueuePartitions(partitions);
+ mergedRemoteQueuePartitions.add(merged);
+ }
+
+ aggregate.setRemoteQueuePartitions(mergedRemoteQueuePartitions);
+
+ return aggregate;
+ }
+
+ private RemoteQueuePartitionDTO mergeRemoteQueuePartitions(final List<RemoteQueuePartitionDTO> partitions) {
+ final RemoteQueuePartitionDTO merged = new RemoteQueuePartitionDTO();
+ merged.setActiveQueueByteCount(0);
+ merged.setActiveQueueFlowFileCount(0);
+ merged.setInFlightByteCount(0);
+ merged.setInFlightFlowFileCount(0);
+ merged.setSwapByteCount(0);
+ merged.setSwapFiles(0);
+ merged.setSwapFlowFileCount(0);
+ merged.setTotalByteCount(0);
+ merged.setTotalFlowFileCount(0);
+
+ for (final RemoteQueuePartitionDTO partition : partitions) {
+ merged.setActiveQueueByteCount(merged.getActiveQueueByteCount() + partition.getActiveQueueByteCount());
+ merged.setActiveQueueFlowFileCount(merged.getActiveQueueFlowFileCount() + partition.getActiveQueueFlowFileCount());
+ merged.setInFlightByteCount(merged.getInFlightByteCount() + partition.getInFlightByteCount());
+ merged.setInFlightFlowFileCount(merged.getInFlightFlowFileCount() + partition.getInFlightFlowFileCount());
+ merged.setSwapByteCount(merged.getSwapByteCount() + partition.getSwapByteCount());
+ merged.setSwapFiles(merged.getSwapFiles() + partition.getSwapFiles());
+ merged.setSwapFlowFileCount(merged.getSwapFlowFileCount() + partition.getSwapFlowFileCount());
+ merged.setTotalByteCount(merged.getTotalByteCount() + partition.getTotalByteCount());
+ merged.setTotalFlowFileCount(merged.getTotalFlowFileCount() + partition.getTotalFlowFileCount());
+ merged.setNodeIdentifier(partition.getNodeIdentifier());
+ }
+ return merged;
}
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/flow/TestPopularVoteFlowElection.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/flow/TestPopularVoteFlowElection.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/flow/TestPopularVoteFlowElection.java
index 1915b9b..9c833b5 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/flow/TestPopularVoteFlowElection.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/flow/TestPopularVoteFlowElection.java
@@ -218,7 +218,7 @@ public class TestPopularVoteFlowElection {
}
private NodeIdentifier createNodeId(final int index) {
- return new NodeIdentifier(UUID.randomUUID().toString(), "localhost", 9000 + index, "localhost", 9000 + index, "localhost", 9000 + index, 9000 + index, true);
+ return new NodeIdentifier(UUID.randomUUID().toString(), "localhost", 9000 + index, "localhost", 9000 + index, "localhost", 9000 + index, "localhost", 9000 + index, 9000 + index, true);
}
private DataFlow createDataFlow(final byte[] flow) {
http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/heartbeat/TestAbstractHeartbeatMonitor.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/heartbeat/TestAbstractHeartbeatMonitor.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/heartbeat/TestAbstractHeartbeatMonitor.java
index 50bdd0d..6ea019d 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/heartbeat/TestAbstractHeartbeatMonitor.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/heartbeat/TestAbstractHeartbeatMonitor.java
@@ -19,6 +19,7 @@ package org.apache.nifi.cluster.coordination.heartbeat;
import org.apache.nifi.cluster.ReportedEvent;
import org.apache.nifi.cluster.coordination.ClusterCoordinator;
+import org.apache.nifi.cluster.coordination.ClusterTopologyEventListener;
import org.apache.nifi.cluster.coordination.node.DisconnectionCode;
import org.apache.nifi.cluster.coordination.node.NodeConnectionState;
import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus;
@@ -55,7 +56,7 @@ public class TestAbstractHeartbeatMonitor {
@Before
public void setup() throws Exception {
System.setProperty(NiFiProperties.PROPERTIES_FILE_PATH, "src/test/resources/conf/nifi.properties");
- nodeId = new NodeIdentifier(UUID.randomUUID().toString(), "localhost", 9999, "localhost", 8888, "localhost", null, null, false);
+ nodeId = new NodeIdentifier(UUID.randomUUID().toString(), "localhost", 9999, "localhost", 8888, "localhost", 777, "localhost", null, null, false);
}
@After
@@ -136,7 +137,7 @@ public class TestAbstractHeartbeatMonitor {
@Test
public void testDisconnectionOfTerminatedNodeDueToLackOfHeartbeat() throws Exception {
final NodeIdentifier nodeId1 = nodeId;
- final NodeIdentifier nodeId2 = new NodeIdentifier(UUID.randomUUID().toString(), "localhost", 7777, "localhost", 6666, "localhost", null, null, false);
+ final NodeIdentifier nodeId2 = new NodeIdentifier(UUID.randomUUID().toString(), "localhost", 7777, "localhost", 6666, "localhost", 5555, "localhost", null, null, false);
final ClusterCoordinatorAdapter adapter = new ClusterCoordinatorAdapter();
final TestFriendlyHeartbeatMonitor monitor = createMonitor(adapter);
@@ -272,7 +273,7 @@ public class TestAbstractHeartbeatMonitor {
}
@Override
- public synchronized boolean isBlockedByFirewall(String hostname) {
+ public synchronized boolean isBlockedByFirewall(Set<String> nodeIds) {
return false;
}
@@ -369,6 +370,14 @@ public class TestAbstractHeartbeatMonitor {
public Map<NodeIdentifier, NodeWorkload> getClusterWorkload() throws IOException {
return null;
}
+
+ @Override
+ public void registerEventListener(final ClusterTopologyEventListener eventListener) {
+ }
+
+ @Override
+ public void unregisterEventListener(final ClusterTopologyEventListener eventListener) {
+ }
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/StandardHttpResponseMapperSpec.groovy
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/StandardHttpResponseMapperSpec.groovy b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/StandardHttpResponseMapperSpec.groovy
index f2d3a24..75b7ae3 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/StandardHttpResponseMapperSpec.groovy
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/StandardHttpResponseMapperSpec.groovy
@@ -64,7 +64,8 @@ class StandardHttpResponseMapperSpec extends Specification {
int n = it.node
def response = Mock(Response)
mockToRequestEntity.put response, it
- new NodeResponse(new NodeIdentifier("cluster-node-$n", 'addr', n, 'sktaddr', n * 10, 'stsaddr', n * 100, n * 1000, false, null), "get", requestUri, response, 500L, requestId)
+ new NodeResponse(new NodeIdentifier("cluster-node-$n", 'addr', n, 'sktaddr', n * 10, 'sktaddr', n * 10, 'stsaddr', n * 100, n * 1000, false, null), "get", requestUri, response, 500L,
+ requestId)
} as Set
when:
@@ -102,7 +103,8 @@ class StandardHttpResponseMapperSpec extends Specification {
++n
def response = Mock(Response)
mockToRequestEntity.put response, it
- new NodeResponse(new NodeIdentifier("cluster-node-$n", 'addr', n, 'sktaddr', n * 10, 'stsaddr', n * 100, n * 1000, false, null), "get", requestUri, response, 500L, requestId)
+ new NodeResponse(new NodeIdentifier("cluster-node-$n", 'addr', n, 'sktaddr', n * 10, 'sktaddr', n * 11, 'stsaddr', n * 100, n * 1000, false, null), "get", requestUri, response, 500L,
+ requestId)
} as Set
when:
http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/endpoints/CurrentUserEndpointMergerTest.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/endpoints/CurrentUserEndpointMergerTest.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/endpoints/CurrentUserEndpointMergerTest.java
index c1cfdf8..a93cd68 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/endpoints/CurrentUserEndpointMergerTest.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/endpoints/CurrentUserEndpointMergerTest.java
@@ -39,7 +39,7 @@ public class CurrentUserEndpointMergerTest {
@Test
public void testMergeUserPermissions() {
- final NodeIdentifier nodeId1 = new NodeIdentifier("1", "localhost", 9000, "localhost", 9001, "localhost", 9002, 9003, false);
+ final NodeIdentifier nodeId1 = new NodeIdentifier("1", "localhost", 9000, "localhost", 9001, "localhost", 9006, "localhost", 9002, 9003, false);
final CurrentUserEntity userNode1 = new CurrentUserEntity();
userNode1.setControllerPermissions(buildPermissions(true, false));
userNode1.setCountersPermissions(buildPermissions(true, true));
@@ -55,7 +55,7 @@ public class CurrentUserEndpointMergerTest {
componentRestrictionsNode1.add(buildComponentRestriction(RequiredPermission.READ_FILESYSTEM, true, true));
userNode1.setComponentRestrictionPermissions(componentRestrictionsNode1);
- final NodeIdentifier nodeId2 = new NodeIdentifier("2", "localhost", 8000, "localhost", 8001, "localhost", 8002, 8003, false);
+ final NodeIdentifier nodeId2 = new NodeIdentifier("2", "localhost", 8000, "localhost", 8001, "localhost", 9006,"localhost", 8002, 8003, false);
final CurrentUserEntity userNode2 = new CurrentUserEntity();
userNode2.setControllerPermissions(buildPermissions(false, true));
userNode2.setCountersPermissions(buildPermissions(true, false));
http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/endpoints/StatusHistoryEndpointMergerSpec.groovy
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/endpoints/StatusHistoryEndpointMergerSpec.groovy b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/endpoints/StatusHistoryEndpointMergerSpec.groovy
index 232c562..104e69b 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/endpoints/StatusHistoryEndpointMergerSpec.groovy
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/endpoints/StatusHistoryEndpointMergerSpec.groovy
@@ -57,7 +57,8 @@ class StatusHistoryEndpointMergerSpec extends Specification {
++n
def response = Mock(Response)
mockToRequestEntity.put response, it
- new NodeResponse(new NodeIdentifier("cluster-node-$n", 'addr', n, 'sktaddr', n * 10, 'stsaddr', n * 100, n * 1000, false, null), "get", requestUri, response, 500L, requestId)
+ new NodeResponse(new NodeIdentifier("cluster-node-$n", 'addr', n, 'sktaddr', n * 10, null, n * 10, 'stsaddr', n * 100, n * 1000, false, null),
+ "GET", requestUri, response, 500L, requestId)
} as Set
when: