You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2014/12/08 21:29:34 UTC
[11/51] [partial] incubator-nifi git commit: Initial code contribution
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/node/Node.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/node/Node.java b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/node/Node.java
new file mode 100644
index 0000000..84565da
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/node/Node.java
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.node;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.nifi.cluster.HeartbeatPayload;
+import org.apache.nifi.cluster.protocol.Heartbeat;
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
+import org.apache.nifi.cluster.protocol.ProtocolException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Represents a connected flow controller. Nodes always have an immutable
+ * identifier and a status. The status may be changed, but never null.
+ *
+ * A Node may be cloned, but the cloning is a shallow copy of the instance.
+ *
+ * This class overrides hashCode and equals and considers two instances to be
+ * equal if they have the equal NodeIdentifiers.
+ *
+ * @author unattributed
+ */
+public class Node implements Cloneable, Comparable<Node> {
+
+ private static final Logger lockLogger = LoggerFactory.getLogger("cluster.lock");
+
+ /**
+ * The semantics of a Node status are as follows:
+ * <ul>
+ * <li>CONNECTED -- a flow controller that is connected to the cluster. A
+ * connecting node transitions to connected after the cluster receives the
+ * flow controller's first heartbeat. A connected node can transition to
+ * disconnecting.</li>
+ * <li>CONNECTING -- a flow controller has issued a connection request to
+ * the cluster, but has not yet sent a heartbeat. A connecting node can
+ * transition to disconnecting or connected. The cluster will not accept any
+ * external requests to change the flow while any node is connecting.</li>
+ * <li>DISCONNECTED -- a flow controller that is not connected to the
+ * cluster. A disconnected node can transition to connecting.</li>
+ * <li>DISCONNECTING -- a flow controller that is in the process of
+ * disconnecting from the cluster. A disconnecting node will always
+ * transition to disconnected.</li>
+ * </ul>
+ */
+ public static enum Status {
+
+ CONNECTED,
+ CONNECTING,
+ DISCONNECTED,
+ DISCONNECTING
+ }
+
+ /**
+ * the node's unique identifier
+ */
+ private final NodeIdentifier nodeId;
+
+ /**
+ * the node statue
+ */
+ private Status status;
+
+ /**
+ * the last heartbeat received by from the node
+ */
+ private Heartbeat lastHeartbeat;
+
+ /**
+ * the payload of the last heartbeat received from the node
+ */
+ private HeartbeatPayload lastHeartbeatPayload;
+
+ /**
+ * the last time the connection for this node was requested
+ */
+ private AtomicLong connectionRequestedTimestamp = new AtomicLong(0L);
+
+ /**
+ * a flag to indicate this node was disconnected because of a lack of
+ * heartbeat
+ */
+ private boolean heartbeatDisconnection;
+
+ public Node(final NodeIdentifier id, final Status status) {
+ if (id == null) {
+ throw new IllegalArgumentException("ID may not be null.");
+ } else if (status == null) {
+ throw new IllegalArgumentException("Status may not be null.");
+ }
+ this.nodeId = id;
+ this.status = status;
+ }
+
+ public NodeIdentifier getNodeId() {
+ return nodeId;
+ }
+
+ /**
+ * Returns the last received heartbeat or null if no heartbeat has been set.
+ *
+ * @return a heartbeat or null
+ */
+ public Heartbeat getHeartbeat() {
+ return lastHeartbeat;
+ }
+
+ public HeartbeatPayload getHeartbeatPayload() {
+ return lastHeartbeatPayload;
+ }
+
+ /**
+ * Sets the last heartbeat received.
+ *
+ * @param heartbeat a heartbeat
+ *
+ * @throws ProtocolException if the heartbeat's payload failed unmarshalling
+ */
+ public void setHeartbeat(final Heartbeat heartbeat) throws ProtocolException {
+ this.lastHeartbeat = heartbeat;
+ if (this.lastHeartbeat == null) {
+ this.lastHeartbeatPayload = null;
+ } else {
+ final byte[] payload = lastHeartbeat.getPayload();
+ if (payload == null || payload.length == 0) {
+ this.lastHeartbeatPayload = null;
+ } else {
+ this.lastHeartbeatPayload = HeartbeatPayload.unmarshal(payload);
+ }
+ }
+ }
+
+ /**
+ * Returns the time of the last received connection request for this node.
+ *
+ * @return the time when the connection request for this node was received.
+ */
+ public long getConnectionRequestedTimestamp() {
+ return connectionRequestedTimestamp.get();
+ }
+
+ /**
+ * Sets the time when the connection request for this node was last
+ * received.
+ *
+ * This method is thread-safe and may be called without obtaining any lock.
+ *
+ * @param connectionRequestedTimestamp
+ */
+ public void setConnectionRequestedTimestamp(long connectionRequestedTimestamp) {
+ this.connectionRequestedTimestamp.set(connectionRequestedTimestamp);
+ }
+
+ /**
+ * Returns true if the node was disconnected due to lack of heartbeat; false
+ * otherwise.
+ *
+ * @return true if the node was disconnected due to lack of heartbeat; false
+ * otherwise.
+ */
+ public boolean isHeartbeatDisconnection() {
+ return heartbeatDisconnection;
+ }
+
+ /**
+ * Sets the status to disconnected and flags the node as being disconnected
+ * by lack of heartbeat.
+ */
+ public void setHeartbeatDisconnection() {
+ setStatus(Status.DISCONNECTED);
+ heartbeatDisconnection = true;
+ }
+
+ /**
+ * @return the status
+ */
+ public Status getStatus() {
+ return status;
+ }
+
+ /**
+ * @param status a status
+ */
+ public void setStatus(final Status status) {
+ if (status == null) {
+ throw new IllegalArgumentException("Status may not be null.");
+ }
+ this.status = status;
+ heartbeatDisconnection = false;
+ }
+
+ @Override
+ public Node clone() {
+ final Node clone = new Node(nodeId, status);
+ clone.lastHeartbeat = lastHeartbeat;
+ clone.lastHeartbeatPayload = lastHeartbeatPayload;
+ clone.heartbeatDisconnection = heartbeatDisconnection;
+ clone.connectionRequestedTimestamp = connectionRequestedTimestamp;
+ return clone;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj == null) {
+ return false;
+ }
+ if (getClass() != obj.getClass()) {
+ return false;
+ }
+ final Node other = (Node) obj;
+ if (this.nodeId != other.nodeId && (this.nodeId == null || !this.nodeId.equals(other.nodeId))) {
+ return false;
+ }
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ int hash = 7;
+ hash = 53 * hash + (this.nodeId != null ? this.nodeId.hashCode() : 0);
+ return hash;
+ }
+
+ @Override
+ public String toString() {
+ return nodeId.toString();
+ }
+
+ @Override
+ public int compareTo(final Node o) {
+ if (o == null) {
+ return -1;
+ }
+ return getNodeId().getId().compareTo(o.getNodeId().getId());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/spring/ClusterManagerProtocolServiceLocatorFactoryBean.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/spring/ClusterManagerProtocolServiceLocatorFactoryBean.java b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/spring/ClusterManagerProtocolServiceLocatorFactoryBean.java
new file mode 100644
index 0000000..e26d196
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/spring/ClusterManagerProtocolServiceLocatorFactoryBean.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.spring;
+
+import java.net.InetSocketAddress;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.nifi.cluster.protocol.impl.ClusterServiceDiscovery;
+import org.apache.nifi.cluster.protocol.impl.ClusterServiceLocator;
+import org.apache.nifi.io.socket.multicast.DiscoverableService;
+import org.apache.nifi.io.socket.multicast.DiscoverableServiceImpl;
+import org.apache.nifi.util.FormatUtils;
+import org.apache.nifi.util.NiFiProperties;
+
+import org.springframework.beans.BeansException;
+import org.springframework.beans.factory.DisposableBean;
+import org.springframework.beans.factory.FactoryBean;
+import org.springframework.context.ApplicationContext;
+import org.springframework.context.ApplicationContextAware;
+
+/**
+ * Factory bean for creating a singleton ClusterManagerProtocolServiceLocator
+ * instance. If the application is configured to act as the cluster manager,
+ * then null is always returned as the created instance.
+ *
+ * The cluster manager protocol service represents the socket endpoint for
+ * sending internal socket messages to the cluster manager.
+ */
+public class ClusterManagerProtocolServiceLocatorFactoryBean implements FactoryBean, ApplicationContextAware, DisposableBean {
+
+ private ApplicationContext applicationContext;
+
+ private ClusterServiceLocator locator;
+
+ private NiFiProperties properties;
+
+ @Override
+ public Object getObject() throws Exception {
+ /*
+ * If configured for the cluster manager, then the service locator is never used.
+ */
+ if (properties.isClusterManager()) {
+ return null;
+ } else if (locator == null) {
+
+ if (properties.getClusterProtocolUseMulticast()) {
+
+ // get the service discovery instance
+ final ClusterServiceDiscovery serviceDiscovery = applicationContext.getBean("clusterManagerProtocolServiceDiscovery", ClusterServiceDiscovery.class);
+
+ // create service location configuration
+ final ClusterServiceLocator.AttemptsConfig config = new ClusterServiceLocator.AttemptsConfig();
+ config.setNumAttempts(properties.getClusterProtocolMulticastServiceLocatorAttempts());
+
+ final int delay = (int) FormatUtils.getTimeDuration(properties.getClusterProtocolMulticastServiceLocatorAttemptsDelay(), TimeUnit.SECONDS);
+ config.setTimeBetweenAttempts(delay);
+ config.setTimeBetweenAttempsUnit(TimeUnit.SECONDS);
+
+ locator = new ClusterServiceLocator(serviceDiscovery);
+ locator.setAttemptsConfig(config);
+
+ } else {
+ final String serviceName = applicationContext.getBean("clusterManagerProtocolServiceName", String.class);
+ final InetSocketAddress serviceAddress = properties.getClusterNodeUnicastManagerProtocolAddress();
+ final DiscoverableService service = new DiscoverableServiceImpl(serviceName, serviceAddress);
+ locator = new ClusterServiceLocator(service);
+ }
+
+ // start the locator
+ locator.start();
+
+ }
+ return locator;
+
+ }
+
+ @Override
+ public Class getObjectType() {
+ return ClusterServiceLocator.class;
+ }
+
+ @Override
+ public boolean isSingleton() {
+ return true;
+ }
+
+ @Override
+ public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
+ this.applicationContext = applicationContext;
+ }
+
+ @Override
+ public void destroy() throws Exception {
+ if (locator != null && locator.isRunning()) {
+ locator.stop();
+ }
+ }
+
+ public void setProperties(NiFiProperties properties) {
+ this.properties = properties;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/spring/FileBasedClusterNodeFirewallFactoryBean.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/spring/FileBasedClusterNodeFirewallFactoryBean.java b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/spring/FileBasedClusterNodeFirewallFactoryBean.java
new file mode 100644
index 0000000..ef72298
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/spring/FileBasedClusterNodeFirewallFactoryBean.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.spring;
+
+import java.io.File;
+import org.apache.nifi.cluster.firewall.impl.FileBasedClusterNodeFirewall;
+import org.apache.nifi.util.NiFiProperties;
+import org.springframework.beans.factory.FactoryBean;
+
+/**
+ * Factory bean for creating a singleton FileBasedClusterNodeFirewall instance.
+ */
+public class FileBasedClusterNodeFirewallFactoryBean implements FactoryBean {
+
+ private FileBasedClusterNodeFirewall firewall;
+
+ private NiFiProperties properties;
+
+ @Override
+ public Object getObject() throws Exception {
+ if (firewall == null) {
+ final File config = properties.getClusterManagerNodeFirewallFile();
+ final File restoreDirectory = properties.getRestoreDirectory();
+ if (config != null) {
+ firewall = new FileBasedClusterNodeFirewall(config, restoreDirectory);
+ }
+ }
+ return firewall;
+ }
+
+ @Override
+ public Class getObjectType() {
+ return FileBasedClusterNodeFirewall.class;
+ }
+
+ @Override
+ public boolean isSingleton() {
+ return true;
+ }
+
+ public void setProperties(NiFiProperties properties) {
+ this.properties = properties;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/spring/WebClusterManagerFactoryBean.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/spring/WebClusterManagerFactoryBean.java b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/spring/WebClusterManagerFactoryBean.java
new file mode 100644
index 0000000..7169730
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/spring/WebClusterManagerFactoryBean.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.spring;
+
+import java.nio.file.Paths;
+import org.apache.nifi.admin.service.AuditService;
+import org.apache.nifi.cluster.event.EventManager;
+import org.apache.nifi.cluster.firewall.ClusterNodeFirewall;
+import org.apache.nifi.cluster.flow.DataFlowManagementService;
+import org.apache.nifi.cluster.manager.HttpRequestReplicator;
+import org.apache.nifi.cluster.manager.HttpResponseMapper;
+import org.apache.nifi.cluster.manager.impl.WebClusterManager;
+import org.apache.nifi.cluster.protocol.impl.ClusterManagerProtocolSenderListener;
+import org.apache.nifi.cluster.protocol.impl.ClusterServicesBroadcaster;
+import org.apache.nifi.controller.service.ControllerServiceLoader;
+import org.apache.nifi.encrypt.StringEncryptor;
+import org.apache.nifi.io.socket.multicast.DiscoverableService;
+import org.apache.nifi.io.socket.multicast.DiscoverableServiceImpl;
+import org.apache.nifi.util.NiFiProperties;
+import org.springframework.beans.BeansException;
+import org.springframework.beans.factory.FactoryBean;
+import org.springframework.context.ApplicationContext;
+import org.springframework.context.ApplicationContextAware;
+
+/**
+ * Factory bean for creating a singleton WebClusterManager instance. If the
+ * application is not configured to act as the cluster manager, then null is
+ * always returned as the created instance.
+ */
+public class WebClusterManagerFactoryBean implements FactoryBean, ApplicationContextAware {
+
+ private ApplicationContext applicationContext;
+
+ private WebClusterManager clusterManager;
+
+ private NiFiProperties properties;
+
+ private StringEncryptor encryptor;
+
+ @Override
+ public Object getObject() throws Exception {
+ if (properties.isClusterManager() && properties.isNode()) {
+ throw new IllegalStateException("Application may be configured as a cluster manager or a node, but not both.");
+ } else if (!properties.isClusterManager()) {
+ /*
+ * If not configured for the cluster manager, then the cluster manager is never used.
+ * null is returned so that we don't instantiate a thread pool or other resources.
+ */
+ return null;
+ } else if (clusterManager == null) {
+
+ // get the service configuration path (fail early)
+ final String serviceConfigurationFile = properties.getProperty(NiFiProperties.SERVICE_CONFIGURATION_FILE);
+ if (serviceConfigurationFile == null) {
+ throw new NullPointerException("The service configuration file has not been specified.");
+ }
+
+ final HttpRequestReplicator requestReplicator = applicationContext.getBean("httpRequestReplicator", HttpRequestReplicator.class);
+ final HttpResponseMapper responseMapper = applicationContext.getBean("httpResponseMapper", HttpResponseMapper.class);
+ final DataFlowManagementService dataFlowService = applicationContext.getBean("dataFlowManagementService", DataFlowManagementService.class);
+ final ClusterManagerProtocolSenderListener senderListener = applicationContext.getBean("clusterManagerProtocolSenderListener", ClusterManagerProtocolSenderListener.class);
+
+ // create the manager
+ clusterManager = new WebClusterManager(
+ requestReplicator,
+ responseMapper,
+ dataFlowService,
+ senderListener,
+ properties,
+ encryptor
+ );
+
+ // set the service broadcaster
+ if (properties.getClusterProtocolUseMulticast()) {
+
+ // create broadcaster
+ final ClusterServicesBroadcaster broadcaster = applicationContext.getBean("clusterServicesBroadcaster", ClusterServicesBroadcaster.class);
+
+ // register the cluster manager protocol service
+ final String clusterManagerProtocolServiceName = applicationContext.getBean("clusterManagerProtocolServiceName", String.class);
+ final DiscoverableService clusterManagerProtocolService = new DiscoverableServiceImpl(clusterManagerProtocolServiceName, properties.getClusterManagerProtocolAddress());
+ broadcaster.addService(clusterManagerProtocolService);
+
+ clusterManager.setServicesBroadcaster(broadcaster);
+ }
+
+ // set the event manager
+ clusterManager.setEventManager(applicationContext.getBean("nodeEventHistoryManager", EventManager.class));
+
+ // set the cluster firewall
+ clusterManager.setClusterFirewall(applicationContext.getBean("clusterFirewall", ClusterNodeFirewall.class));
+
+ // set the audit service
+ clusterManager.setAuditService(applicationContext.getBean("auditService", AuditService.class));
+
+ // load the controller services
+ final ControllerServiceLoader serviceLoader = new ControllerServiceLoader(Paths.get(serviceConfigurationFile));
+ serviceLoader.loadControllerServices(clusterManager);
+ }
+ return clusterManager;
+ }
+
+ @Override
+ public Class getObjectType() {
+ return WebClusterManager.class;
+ }
+
+ @Override
+ public boolean isSingleton() {
+ return true;
+ }
+
+ @Override
+ public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
+ this.applicationContext = applicationContext;
+ }
+
+ public void setProperties(NiFiProperties properties) {
+ this.properties = properties;
+ }
+
+ public void setEncryptor(final StringEncryptor encryptor) {
+ this.encryptor = encryptor;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/controller/reporting/ClusteredReportingTaskNode.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/controller/reporting/ClusteredReportingTaskNode.java b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/controller/reporting/ClusteredReportingTaskNode.java
new file mode 100644
index 0000000..1ed5b30
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/controller/reporting/ClusteredReportingTaskNode.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.reporting;
+
+import org.apache.nifi.cluster.manager.impl.ClusteredReportingContext;
+import org.apache.nifi.controller.ProcessScheduler;
+import org.apache.nifi.controller.ValidationContextFactory;
+import org.apache.nifi.controller.service.ControllerServiceProvider;
+import org.apache.nifi.reporting.BulletinRepository;
+import org.apache.nifi.reporting.EventAccess;
+import org.apache.nifi.reporting.ReportingContext;
+import org.apache.nifi.reporting.ReportingTask;
+
+public class ClusteredReportingTaskNode extends AbstractReportingTaskNode {
+
+ private final EventAccess eventAccess;
+ private final BulletinRepository bulletinRepository;
+ private final ControllerServiceProvider serviceProvider;
+
+ public ClusteredReportingTaskNode(final ReportingTask reportingTask, final String id, final ProcessScheduler scheduler,
+ final EventAccess eventAccess, final BulletinRepository bulletinRepository, final ControllerServiceProvider serviceProvider,
+ final ValidationContextFactory validationContextFactory) {
+ super(reportingTask, id, serviceProvider, scheduler, validationContextFactory);
+
+ this.eventAccess = eventAccess;
+ this.bulletinRepository = bulletinRepository;
+ this.serviceProvider = serviceProvider;
+ }
+
+ @Override
+ public ReportingContext getReportingContext() {
+ return new ClusteredReportingContext(eventAccess, bulletinRepository, getProperties(), serviceProvider);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster/src/main/resources/nifi-cluster-manager-context.xml
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/cluster/src/main/resources/nifi-cluster-manager-context.xml b/nar-bundles/framework-bundle/framework/cluster/src/main/resources/nifi-cluster-manager-context.xml
new file mode 100644
index 0000000..68c29bc
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/cluster/src/main/resources/nifi-cluster-manager-context.xml
@@ -0,0 +1,124 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<!-- marked as lazy so that clustering beans are not created when applications runs in non-clustered mode -->
+<beans default-lazy-init="true"
+ xmlns="http://www.springframework.org/schema/beans"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xmlns:util="http://www.springframework.org/schema/util"
+ xmlns:context="http://www.springframework.org/schema/context"
+ xmlns:aop="http://www.springframework.org/schema/aop"
+ xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.1.xsd
+ http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-3.1.xsd
+ http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.1.xsd
+ http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.1.xsd">
+
+ <!-- jersey client -->
+ <bean id="jersey-client" class="org.apache.nifi.web.util.WebUtils" factory-method="createClient">
+ <constructor-arg>
+ <bean class="com.sun.jersey.api.client.config.DefaultClientConfig"/>
+ </constructor-arg>
+ <constructor-arg>
+ <bean class="org.apache.nifi.framework.security.util.SslContextFactory" factory-method="createSslContext">
+ <constructor-arg ref="nifiProperties"/>
+ </bean>
+ </constructor-arg>
+ </bean>
+
+ <!-- http request replicator -->
+ <bean id="httpRequestReplicator" class="org.apache.nifi.cluster.manager.impl.HttpRequestReplicatorImpl">
+ <constructor-arg index="0">
+ <bean factory-bean="nifiProperties" factory-method="getClusterManagerNodeApiRequestThreads"/>
+ </constructor-arg>
+ <constructor-arg ref="jersey-client" index="1"/>
+ <constructor-arg index="2">
+ <bean factory-bean="nifiProperties" factory-method="getClusterManagerNodeApiConnectionTimeout"/>
+ </constructor-arg>
+ <constructor-arg index="3">
+ <bean factory-bean="nifiProperties" factory-method="getClusterManagerNodeApiReadTimeout"/>
+ </constructor-arg>
+ <property name="nodeProtocolScheme">
+ <bean factory-bean="nifiProperties" factory-method="getClusterProtocolManagerToNodeApiScheme"/>
+ </property>
+ </bean>
+
+ <!-- http response mapper -->
+ <bean id="httpResponseMapper" class="org.apache.nifi.cluster.manager.impl.HttpResponseMapperImpl"/>
+
+ <!-- cluster flow DAO -->
+ <bean id="dataFlowDao" class="org.apache.nifi.cluster.flow.impl.DataFlowDaoImpl">
+ <constructor-arg index="0">
+ <bean factory-bean="nifiProperties" factory-method="getFlowConfigurationFileDir"/>
+ </constructor-arg>
+ <constructor-arg index="1">
+ <bean factory-bean="nifiProperties" factory-method="getRestoreDirectory"/>
+ </constructor-arg>
+ <constructor-arg index="2">
+ <bean factory-bean="nifiProperties" factory-method="getAutoResumeState"/>
+ </constructor-arg>
+ </bean>
+
+ <!-- dataflow management service -->
+ <bean id="dataFlowManagementService" class="org.apache.nifi.cluster.flow.impl.DataFlowManagementServiceImpl">
+ <constructor-arg ref="dataFlowDao"/>
+ <constructor-arg ref="clusterManagerProtocolSender"/>
+ <property name="retrievalDelay">
+ <bean factory-bean="nifiProperties" factory-method="getClusterManagerFlowRetrievalDelay"/>
+ </property>
+ </bean>
+
+ <!-- node event history manager -->
+ <bean id="nodeEventHistoryManager" class="org.apache.nifi.cluster.event.impl.EventManagerImpl">
+ <constructor-arg>
+ <bean factory-bean="nifiProperties" factory-method="getClusterManagerNodeEventHistorySize"/>
+ </constructor-arg>
+ </bean>
+
+ <!-- cluster firewall -->
+ <bean id="clusterFirewall" class="org.apache.nifi.cluster.spring.FileBasedClusterNodeFirewallFactoryBean">
+ <property name="properties" ref="nifiProperties"/>
+ </bean>
+
+ <!-- cluster manager -->
+ <bean id="clusterManager" class="org.apache.nifi.cluster.spring.WebClusterManagerFactoryBean">
+ <property name="properties" ref="nifiProperties"/>
+ <property name="encryptor" ref="stringEncryptor"/>
+ </bean>
+
+ <!-- discoverable services -->
+
+ <!-- cluster manager protocol discoverable service -->
+
+ <!-- service name for communicating with the cluster manager using sockets -->
+ <bean id="clusterManagerProtocolServiceName" class="java.lang.String">
+ <constructor-arg value="cluster-manager-protocol" />
+ </bean>
+
+ <!-- cluster manager protocol service discovery -->
+ <bean id="clusterManagerProtocolServiceDiscovery" class="org.apache.nifi.cluster.protocol.impl.ClusterServiceDiscovery">
+ <constructor-arg ref="clusterManagerProtocolServiceName" index="0"/>
+ <constructor-arg index="1">
+ <bean factory-bean="nifiProperties" factory-method="getClusterProtocolMulticastAddress"/>
+ </constructor-arg>
+ <constructor-arg ref="protocolMulticastConfiguration" index="2"/>
+ <constructor-arg ref="protocolContext" index="3"/>
+ </bean>
+
+ <!-- cluster manager protocol service locator -->
+ <bean id="clusterManagerProtocolServiceLocator" class="org.apache.nifi.cluster.spring.ClusterManagerProtocolServiceLocatorFactoryBean">
+ <property name="properties" ref="nifiProperties"/>
+ </bean>
+
+</beans>
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/event/impl/EventManagerImplTest.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/event/impl/EventManagerImplTest.java b/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/event/impl/EventManagerImplTest.java
new file mode 100644
index 0000000..09ea44b
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/event/impl/EventManagerImplTest.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.event.impl;
+
+import org.apache.nifi.cluster.event.impl.EventManagerImpl;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import org.apache.nifi.cluster.event.Event;
+import org.apache.nifi.cluster.event.Event.Category;
+import org.apache.nifi.cluster.event.EventManager;
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+/**
+ * @author unattributed
+ */
+public class EventManagerImplTest {
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testNonPositiveHistorySize() {
+ new EventManagerImpl(0);
+ }
+
+ @Test
+ public void testGetEventsUnknownSource() {
+ EventManager manager = new EventManagerImpl(1);
+ assertEquals(Collections.EMPTY_LIST, manager.getEvents("unknown value"));
+ }
+
+ @Test
+ public void testGetEvents() {
+
+ EventManager manager = new EventManagerImpl(2);
+
+ Event e1 = new Event("1", "Event1", Category.INFO, 0);
+ Event e2 = new Event("1", "Event2", Category.INFO, 1);
+
+ manager.addEvent(e1);
+ manager.addEvent(e2);
+
+ List<Event> events = manager.getEvents("1");
+
+ // assert newest to oldest
+ assertEquals(Arrays.asList(e2, e1), events);
+ }
+
+ @Test
+ public void testGetMostRecentEventUnknownSource() {
+ EventManager manager = new EventManagerImpl(1);
+ assertNull(manager.getMostRecentEvent("unknown value"));
+ }
+
+ @Test
+ public void testGetMostRecentEvent() {
+
+ EventManager manager = new EventManagerImpl(2);
+
+ Event e1 = new Event("1", "Event1", Category.INFO, 0);
+ Event e2 = new Event("1", "Event2", Category.INFO, 1);
+
+ manager.addEvent(e1);
+ manager.addEvent(e2);
+
+ // assert newest to oldest
+ assertEquals(e2, manager.getMostRecentEvent("1"));
+ }
+
+ @Test
+ public void testAddEventExceedsHistorySize() {
+
+ EventManager manager = new EventManagerImpl(1);
+
+ Event e1 = new Event("1", "Event1", Category.INFO, 0);
+ Event e2 = new Event("1", "Event2", Category.INFO, 1);
+
+ manager.addEvent(e1);
+ manager.addEvent(e2);
+
+ List<Event> events = manager.getEvents("1");
+
+ // assert oldest evicted
+ assertEquals(Arrays.asList(e2), events);
+
+ }
+
+ @Test
+ public void testClearHistory() {
+
+ EventManager manager = new EventManagerImpl(1);
+
+ Event e1 = new Event("1", "Event1", Category.INFO, 0);
+ Event e2 = new Event("1", "Event2", Category.INFO, 1);
+
+ manager.addEvent(e1);
+ manager.addEvent(e2);
+
+ manager.clearEventHistory("1");
+
+ // assert oldest evicted
+ assertTrue(manager.getEvents("1").isEmpty());
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/firewall/impl/FileBasedClusterNodeFirewallTest.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/firewall/impl/FileBasedClusterNodeFirewallTest.java b/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/firewall/impl/FileBasedClusterNodeFirewallTest.java
new file mode 100644
index 0000000..2fcf7ef
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/firewall/impl/FileBasedClusterNodeFirewallTest.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.firewall.impl;
+
+import org.apache.nifi.cluster.firewall.impl.FileBasedClusterNodeFirewall;
+import java.io.File;
+import java.io.IOException;
+import org.apache.nifi.file.FileUtils;
+import org.junit.After;
+import org.junit.Before;
+import static org.junit.Assert.*;
+import org.junit.Test;
+
+public class FileBasedClusterNodeFirewallTest {
+
+ private FileBasedClusterNodeFirewall ipsFirewall;
+
+ private FileBasedClusterNodeFirewall acceptAllFirewall;
+
+ private File ipsConfig;
+
+ private File emptyConfig;
+
+ private File restoreDirectory;
+
+ @Before
+ public void setup() throws Exception {
+
+ ipsConfig = new File("src/test/resources/org/apache/nifi/cluster/firewall/impl/ips.txt");
+ emptyConfig = new File("src/test/resources/org/apache/nifi/cluster/firewall/impl/empty.txt");
+
+ restoreDirectory = new File(System.getProperty("java.io.tmpdir") + "/firewall_restore");
+
+ ipsFirewall = new FileBasedClusterNodeFirewall(ipsConfig, restoreDirectory);
+ acceptAllFirewall = new FileBasedClusterNodeFirewall(emptyConfig);
+ }
+
+ @After
+ public void teardown() throws IOException {
+ deleteFile(restoreDirectory);
+ }
+
+ @Test
+ public void testSyncWithRestore() {
+ assertEquals(ipsConfig.length(), new File(restoreDirectory, ipsConfig.getName()).length());
+ }
+
+ @Test
+ public void testIsPermissibleWithExactMatch() {
+ assertTrue(ipsFirewall.isPermissible("2.2.2.2"));
+ }
+
+ @Test
+ public void testIsPermissibleWithSubnetMatch() {
+ assertTrue(ipsFirewall.isPermissible("3.3.3.255"));
+ }
+
+ @Test
+ public void testIsPermissibleWithNoMatch() {
+ assertFalse(ipsFirewall.isPermissible("255.255.255.255"));
+ }
+
+ @Test
+ public void testIsPermissibleWithMalformedData() {
+ assertFalse(ipsFirewall.isPermissible("abc"));
+ }
+
+ @Test
+ public void testIsPermissibleWithEmptyConfig() {
+ assertTrue(acceptAllFirewall.isPermissible("1.1.1.1"));
+ }
+
+ @Test
+ public void testIsPermissibleWithEmptyConfigWithMalformedData() {
+ assertTrue(acceptAllFirewall.isPermissible("abc"));
+ }
+
+ private boolean deleteFile(final File file) {
+ if (file.isDirectory()) {
+ FileUtils.deleteFilesInDir(file, null, null, true, true);
+ }
+ return FileUtils.deleteFile(file, null, 10);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/flow/impl/DataFlowManagementServiceImplTest.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/flow/impl/DataFlowManagementServiceImplTest.java b/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/flow/impl/DataFlowManagementServiceImplTest.java
new file mode 100644
index 0000000..6294dfc
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/flow/impl/DataFlowManagementServiceImplTest.java
@@ -0,0 +1,341 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.flow.impl;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
+import java.io.ByteArrayInputStream;
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
+
+import javax.xml.parsers.DocumentBuilder;
+import javax.xml.parsers.DocumentBuilderFactory;
+import javax.xml.parsers.ParserConfigurationException;
+
+import org.apache.nifi.cluster.flow.DataFlowDao;
+import org.apache.nifi.cluster.flow.PersistedFlowState;
+import org.apache.nifi.cluster.protocol.ClusterManagerProtocolSender;
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
+import org.apache.nifi.cluster.protocol.ProtocolContext;
+import org.apache.nifi.cluster.protocol.ProtocolException;
+import org.apache.nifi.cluster.protocol.ProtocolHandler;
+import org.apache.nifi.cluster.protocol.StandardDataFlow;
+import org.apache.nifi.cluster.protocol.impl.ClusterManagerProtocolSenderImpl;
+import org.apache.nifi.cluster.protocol.impl.SocketProtocolListener;
+import org.apache.nifi.cluster.protocol.jaxb.JaxbProtocolContext;
+import org.apache.nifi.cluster.protocol.jaxb.message.JaxbProtocolUtils;
+import org.apache.nifi.cluster.protocol.message.FlowResponseMessage;
+import org.apache.nifi.cluster.protocol.message.ProtocolMessage;
+import org.apache.nifi.io.socket.ServerSocketConfiguration;
+import org.apache.nifi.io.socket.SocketConfiguration;
+
+import org.apache.commons.io.FileUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.w3c.dom.Document;
+import org.w3c.dom.Element;
+import org.xml.sax.SAXException;
+
+/**
+ * @author unattributed
+ */
+public class DataFlowManagementServiceImplTest {
+
+ private DataFlowManagementServiceImpl service;
+ private File restoreLocation;
+ private File primaryLocation;
+ private DataFlowDao dao;
+ private int apiDummyPort;
+ private int socketPort;
+ private SocketConfiguration socketConfig;
+ private ClusterManagerProtocolSender sender;
+ private ServerSocketConfiguration serverSocketConfig;
+ private SocketProtocolListener listener;
+
+ @Before
+ public void setup() throws IOException {
+
+ primaryLocation = new File(System.getProperty("java.io.tmpdir") + "/primary");
+ restoreLocation = new File(System.getProperty("java.io.tmpdir") + "/restore");
+
+ FileUtils.deleteDirectory(primaryLocation);
+ FileUtils.deleteDirectory(restoreLocation);
+
+ ProtocolContext protocolContext = new JaxbProtocolContext(JaxbProtocolUtils.JAXB_CONTEXT);
+
+ socketConfig = new SocketConfiguration();
+ socketConfig.setSocketTimeout(1000);
+ serverSocketConfig = new ServerSocketConfiguration();
+
+ dao = new DataFlowDaoImpl(primaryLocation, restoreLocation, false);
+
+ sender = new ClusterManagerProtocolSenderImpl(socketConfig, protocolContext);
+
+ service = new DataFlowManagementServiceImpl(dao, sender);
+ service.start();
+
+ listener = new SocketProtocolListener(1, 0, serverSocketConfig, protocolContext);
+ listener.start();
+
+ apiDummyPort = 7777;
+ socketPort = listener.getPort();
+ }
+
+ @After
+ public void teardown() throws IOException {
+
+ if (service != null && service.isRunning()) {
+ service.stop();
+ }
+
+ if (listener != null && listener.isRunning()) {
+ try {
+ listener.stop();
+ } catch (final Exception ex) {
+ ex.printStackTrace(System.out);
+ }
+ }
+
+ }
+
+ @Test
+ public void testLoadFlowWithNonExistentFlow() throws ParserConfigurationException, SAXException, IOException {
+ verifyFlow();
+ }
+
+ @Test
+ public void testLoadFlowWithNonExistentFlowWhenServiceStopped() throws IOException, SAXException, ParserConfigurationException {
+ service.stop();
+ verifyFlow();
+ }
+
+ private void verifyFlow() throws ParserConfigurationException, SAXException, IOException {
+ final byte[] flowBytes = service.loadDataFlow().getDataFlow().getFlow();
+ final DocumentBuilder docBuilder = DocumentBuilderFactory.newInstance().newDocumentBuilder();
+ final Document doc = docBuilder.parse(new ByteArrayInputStream(flowBytes));
+ final Element controller = (Element) doc.getElementsByTagName("flowController").item(0);
+ final Element rootGroup = (Element) controller.getElementsByTagName("rootGroup").item(0);
+ final String rootGroupName = rootGroup.getElementsByTagName("name").item(0).getTextContent();
+ assertEquals("NiFi Flow", rootGroupName);
+ }
+
+ @Test
+ public void testLoadFlowSingleNode() throws Exception {
+ String flowStr = "<rootGroup />";
+ byte[] flowBytes = flowStr.getBytes();
+ listener.addHandler(new FlowRequestProtocolHandler(new StandardDataFlow(flowStr.getBytes("UTF-8"), new byte[0], new byte[0])));
+
+ NodeIdentifier nodeId = new NodeIdentifier("1", "localhost", apiDummyPort, "localhost", socketPort);
+ service.setNodeIds(new HashSet<>(Arrays.asList(nodeId)));
+ service.setPersistedFlowState(PersistedFlowState.STALE);
+
+ assertEquals(PersistedFlowState.STALE, service.getPersistedFlowState());
+
+ // sleep long enough for the flow retriever to run
+ waitForState(PersistedFlowState.CURRENT);
+
+ assertEquals(PersistedFlowState.CURRENT, service.getPersistedFlowState());
+ assertArrayEquals(flowBytes, service.loadDataFlow().getDataFlow().getFlow());
+
+ }
+
+ @Test
+ public void testLoadFlowWithSameNodeIds() throws Exception {
+
+ String flowStr = "<rootGroup />";
+ listener.addHandler(new FlowRequestProtocolHandler(new StandardDataFlow(flowStr.getBytes("UTF-8"), new byte[0], new byte[0])));
+
+ NodeIdentifier nodeId1 = new NodeIdentifier("1", "localhost", apiDummyPort, "localhost", socketPort);
+ NodeIdentifier nodeId2 = new NodeIdentifier("2", "localhost", apiDummyPort, "localhost", socketPort);
+ service.setNodeIds(new HashSet<>(Arrays.asList(nodeId1, nodeId2)));
+ service.setPersistedFlowState(PersistedFlowState.STALE);
+
+ assertEquals(PersistedFlowState.STALE, service.getPersistedFlowState());
+
+ // sleep long enough for the flow retriever to run
+ waitForState(PersistedFlowState.CURRENT);
+
+ // verify that flow is current
+ assertEquals(PersistedFlowState.CURRENT, service.getPersistedFlowState());
+
+ // add same ids in different order
+ service.setNodeIds(new HashSet<>(Arrays.asList(nodeId2, nodeId1)));
+
+ // verify flow is still current
+ assertEquals(PersistedFlowState.CURRENT, service.getPersistedFlowState());
+
+ }
+
+ @Test
+ public void testLoadFlowWithABadNode() throws Exception {
+
+ String flowStr = "<rootGroup />";
+ byte[] flowBytes = flowStr.getBytes();
+ listener.addHandler(new FlowRequestProtocolHandler(new StandardDataFlow(flowStr.getBytes("UTF-8"), new byte[0], new byte[0])));
+
+ NodeIdentifier nodeId1 = new NodeIdentifier("1", "localhost", apiDummyPort, "localhost", socketPort + 1);
+ NodeIdentifier nodeId2 = new NodeIdentifier("2", "localhost", apiDummyPort, "localhost", socketPort);
+ service.setNodeIds(new HashSet<>(Arrays.asList(nodeId1, nodeId2)));
+ service.setPersistedFlowState(PersistedFlowState.STALE);
+
+ assertEquals(PersistedFlowState.STALE, service.getPersistedFlowState());
+
+ // sleep long enough for the flow retriever to run
+ waitForState(PersistedFlowState.CURRENT);
+
+ assertEquals(PersistedFlowState.CURRENT, service.getPersistedFlowState());
+ assertArrayEquals(flowBytes, service.loadDataFlow().getDataFlow().getFlow());
+
+ }
+
+ @Test
+ public void testLoadFlowWithConstantNodeIdChanging() throws Exception {
+ String flowStr = "<rootGroup />";
+ byte[] flowBytes = flowStr.getBytes();
+ listener.addHandler(new FlowRequestProtocolHandler(new StandardDataFlow(flowStr.getBytes("UTF-8"), new byte[0], new byte[0])));
+
+ NodeIdentifier nodeId1 = new NodeIdentifier("1", "localhost", apiDummyPort, "localhost", socketPort + 1);
+ NodeIdentifier nodeId2 = new NodeIdentifier("2", "localhost", apiDummyPort, "localhost", socketPort);
+
+ for (int i = 0; i < 1000; i++) {
+ service.setNodeIds(new HashSet<>(Arrays.asList(nodeId1, nodeId2)));
+ service.setPersistedFlowState(PersistedFlowState.STALE);
+ assertEquals(PersistedFlowState.STALE, service.getPersistedFlowState());
+ }
+
+ // sleep long enough for the flow retriever to run
+ waitForState(PersistedFlowState.CURRENT);
+
+ assertEquals(PersistedFlowState.CURRENT, service.getPersistedFlowState());
+ assertArrayEquals(flowBytes, service.loadDataFlow().getDataFlow().getFlow());
+ }
+
+ @Test
+ public void testLoadFlowWithConstantNodeIdChangingWithRetrievalDelay() throws Exception {
+
+ String flowStr = "<rootGroup />";
+ listener.addHandler(new FlowRequestProtocolHandler(new StandardDataFlow(flowStr.getBytes("UTF-8"), new byte[0], new byte[0])));
+
+ NodeIdentifier nodeId1 = new NodeIdentifier("1", "localhost", apiDummyPort, "localhost", socketPort + 1);
+ NodeIdentifier nodeId2 = new NodeIdentifier("2", "localhost", apiDummyPort, "localhost", socketPort);
+
+ service.setRetrievalDelay("5 sec");
+ for (int i = 0; i < 1000; i++) {
+ service.setNodeIds(new HashSet<>(Arrays.asList(nodeId1, nodeId2)));
+ service.setPersistedFlowState(PersistedFlowState.STALE);
+ assertEquals(PersistedFlowState.STALE, service.getPersistedFlowState());
+ }
+
+ // sleep long enough for the flow retriever to run
+ waitForState(PersistedFlowState.STALE);
+
+ assertEquals(PersistedFlowState.STALE, service.getPersistedFlowState());
+
+ }
+
+ @Test
+ public void testStopRequestedWhileRetrieving() throws Exception {
+
+ String flowStr = "<rootGroup />";
+ listener.addHandler(new FlowRequestProtocolHandler(new StandardDataFlow(flowStr.getBytes("UTF-8"), new byte[0], new byte[0])));
+ Set<NodeIdentifier> nodeIds = new HashSet<>();
+ for (int i = 0; i < 1000; i++) {
+ nodeIds.add(new NodeIdentifier("1", "localhost", apiDummyPort, "localhost", socketPort + 1));
+ }
+ nodeIds.add(new NodeIdentifier("1", "localhost", apiDummyPort, "localhost", socketPort));
+
+ long lastRetrievalTime = service.getLastRetrievalTime();
+
+ service.setNodeIds(nodeIds);
+ service.setPersistedFlowState(PersistedFlowState.STALE);
+ assertEquals(PersistedFlowState.STALE, service.getPersistedFlowState());
+
+ // sleep long enough for the flow retriever to run
+ waitForState(PersistedFlowState.STALE);
+
+ service.stop();
+
+ service.setPersistedFlowState(PersistedFlowState.STALE);
+ assertEquals(PersistedFlowState.STALE, service.getPersistedFlowState());
+
+ assertEquals(lastRetrievalTime, service.getLastRetrievalTime());
+
+ }
+
+ @Test
+ public void testLoadFlowUnknownState() throws Exception {
+
+ String flowStr = "<rootGroup />";
+ byte[] flowBytes = flowStr.getBytes();
+ listener.addHandler(new FlowRequestProtocolHandler(new StandardDataFlow(flowStr.getBytes("UTF-8"), new byte[0], new byte[0])));
+ NodeIdentifier nodeId = new NodeIdentifier("1", "localhost", apiDummyPort, "localhost", socketPort);
+
+ service.setNodeIds(new HashSet<>(Arrays.asList(nodeId)));
+ service.setPersistedFlowState(PersistedFlowState.STALE);
+ assertEquals(PersistedFlowState.STALE, service.getPersistedFlowState());
+
+ service.setPersistedFlowState(PersistedFlowState.UNKNOWN);
+
+ assertEquals(PersistedFlowState.UNKNOWN, service.getPersistedFlowState());
+
+ service.setPersistedFlowState(PersistedFlowState.STALE);
+ assertEquals(PersistedFlowState.STALE, service.getPersistedFlowState());
+
+ // sleep long enough for the flow retriever to run
+ waitForState(PersistedFlowState.CURRENT);
+
+ assertArrayEquals(flowBytes, service.loadDataFlow().getDataFlow().getFlow());
+
+ }
+
+ private class FlowRequestProtocolHandler implements ProtocolHandler {
+
+ private StandardDataFlow dataFlow;
+
+ public FlowRequestProtocolHandler(final StandardDataFlow dataFlow) {
+ this.dataFlow = dataFlow;
+ }
+
+ @Override
+ public boolean canHandle(ProtocolMessage msg) {
+ return true;
+ }
+
+ @Override
+ public ProtocolMessage handle(ProtocolMessage msg) throws ProtocolException {
+ FlowResponseMessage response = new FlowResponseMessage();
+ response.setDataFlow(dataFlow);
+ return response;
+ }
+
+ }
+
+ private void waitForState(PersistedFlowState state) throws InterruptedException {
+ for (int i = 0; i < 30; i++) {
+ if (service.getPersistedFlowState() == state) {
+ break;
+ } else {
+ Thread.sleep(1000);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/manager/impl/HttpRequestReplicatorImplTest.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/manager/impl/HttpRequestReplicatorImplTest.java b/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/manager/impl/HttpRequestReplicatorImplTest.java
new file mode 100644
index 0000000..0c65aba
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/manager/impl/HttpRequestReplicatorImplTest.java
@@ -0,0 +1,368 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.manager.impl;
+
+import org.apache.nifi.cluster.manager.impl.HttpRequestReplicatorImpl;
+import javax.ws.rs.core.Response;
+import javax.xml.bind.annotation.XmlRootElement;
+import javax.ws.rs.HttpMethod;
+import javax.ws.rs.core.MultivaluedMap;
+import java.net.URISyntaxException;
+import java.util.Arrays;
+import java.util.Iterator;
+import javax.ws.rs.core.StreamingOutput;
+import org.apache.nifi.cluster.manager.testutils.HttpResponse;
+import org.apache.nifi.cluster.manager.testutils.HttpServer;
+import com.sun.jersey.api.client.Client;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.net.URI;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import javax.ws.rs.core.Response.Status;
+import org.apache.nifi.cluster.manager.NodeResponse;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.apache.nifi.cluster.manager.testutils.HttpResponseAction;
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
+import static org.junit.Assert.*;
+
+/**
+ * @author unattributed
+ */
+public class HttpRequestReplicatorImplTest {
+
+ private Client client;
+ private HttpRequestReplicatorImpl replicator;
+ private int executorThreadCount;
+ private int serverThreadCount;
+ private int serverPort;
+ private HttpServer server;
+ private Map<String, List<String>> expectedRequestParameters;
+ private Map<String, String> expectedRequestHeaders;
+ private Map<String, String> expectedResponseHeaders;
+ private Object expectedEntity;
+ private String expectedBody;
+ private URI prototypeUri;
+
+ @Before
+ public void setUp() throws IOException, URISyntaxException {
+
+ executorThreadCount = 5;
+ serverThreadCount = 3;
+
+ client = Client.create();
+
+ replicator = new HttpRequestReplicatorImpl(executorThreadCount, client, "1 sec", "1 sec");
+ replicator.start();
+
+ expectedRequestHeaders = new HashMap<>();
+ expectedRequestHeaders.put("header1", "header value1");
+ expectedRequestHeaders.put("header2", "header value2");
+
+ expectedRequestParameters = new HashMap<>();
+ expectedRequestParameters.put("param1", Arrays.asList("p value1"));
+ expectedRequestParameters.put("param2", Arrays.asList("p value2"));
+
+ expectedResponseHeaders = new HashMap<>();
+ expectedResponseHeaders.put("header1", "header value1");
+ expectedResponseHeaders.put("header2", "header value2");
+
+ expectedEntity = new Entity();
+
+ expectedBody = "some text";
+
+ prototypeUri = new URI("http://prototype.host/path/to/resource");
+
+ server = new HttpServer(serverThreadCount, 0);
+ server.start();
+ serverPort = server.getPort();
+ }
+
+ @After
+ public void teardown() {
+ if (server.isRunning()) {
+ server.stop();
+ }
+ if (replicator.isRunning()) {
+ replicator.stop();
+ }
+ }
+
+ @Test
+ public void testReplicateGetLessNodesThanReplicatorThreads() throws Throwable {
+ testReplicateXXX(executorThreadCount - 1, HttpMethod.GET);
+ }
+
+ @Test
+ public void testReplicateGetMoreNodesThanReplicatorThreads() throws Throwable {
+ testReplicateXXX(executorThreadCount + 1, HttpMethod.GET);
+ }
+
+ @Test
+ public void testReplicateGetWithUnresponsiveNode() throws Throwable {
+
+ // nodes
+ Set<NodeIdentifier> nodeIds = createNodes(2, "localhost", serverPort);
+
+ // response
+ HttpResponse expectedResponse = new HttpResponse(Status.OK, expectedBody);
+
+ // first response normal, second response slow
+ server.addResponseAction(new HttpResponseAction(expectedResponse));
+ server.addResponseAction(new HttpResponseAction(expectedResponse, 3500));
+
+ Set<NodeResponse> responses = replicator.replicate(
+ nodeIds,
+ HttpMethod.GET,
+ prototypeUri,
+ expectedRequestParameters,
+ expectedRequestHeaders);
+
+ assertEquals(nodeIds.size(), responses.size());
+
+ Iterator<NodeResponse> nodeResponseItr = responses.iterator();
+
+ NodeResponse firstResponse = nodeResponseItr.next();
+ NodeResponse secondResponse = nodeResponseItr.next();
+ NodeResponse goodResponse;
+ NodeResponse badResponse;
+ if (firstResponse.hasThrowable()) {
+ goodResponse = secondResponse;
+ badResponse = firstResponse;
+ } else {
+ goodResponse = firstResponse;
+ badResponse = secondResponse;
+ }
+
+ // good response
+ // check status
+ assertEquals(Status.OK.getStatusCode(), goodResponse.getStatus());
+
+ // check entity stream
+ final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ ((StreamingOutput) goodResponse.getResponse().getEntity()).write(baos);
+ assertEquals("some text", new String(baos.toByteArray()));
+
+ // bad response
+ assertTrue(badResponse.hasThrowable());
+ assertEquals(Status.INTERNAL_SERVER_ERROR.getStatusCode(), badResponse.getStatus());
+
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testReplicateGetWithEntity() throws Throwable {
+ testReplicateXXXEntity(HttpMethod.GET);
+ }
+
+ @Test
+ public void testReplicatePost() throws Throwable {
+ testReplicateXXX(HttpMethod.POST);
+ }
+
+ @Test
+ public void testReplicatePostWithEntity() throws Throwable {
+ testReplicateXXXEntity(HttpMethod.POST);
+ }
+
+ @Test
+ public void testReplicatePut() throws Throwable {
+ testReplicateXXX(HttpMethod.PUT);
+ }
+
+ @Test
+ public void testReplicatePutWithEntity() throws Throwable {
+ testReplicateXXXEntity(HttpMethod.PUT);
+ }
+
+ @Test
+ public void testReplicateDelete() throws Throwable {
+ testReplicateXXX(HttpMethod.DELETE);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testReplicateDeleteWithEntity() throws Throwable {
+ testReplicateXXXEntity(HttpMethod.DELETE);
+ }
+
+ @Test
+ public void testReplicateHead() throws Throwable {
+ testReplicateXXX(HttpMethod.HEAD);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testReplicateHeadWithEntity() throws Throwable {
+ testReplicateXXXEntity(HttpMethod.HEAD);
+ }
+
+ @Test
+ public void testReplicateOptions() throws Throwable {
+ testReplicateXXX(HttpMethod.OPTIONS);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testReplicateOptionsWithEntity() throws Throwable {
+ testReplicateXXXEntity(HttpMethod.OPTIONS);
+ }
+
+ private void testReplicateXXX(final String method) throws Throwable {
+ testReplicateXXX(executorThreadCount, method);
+ }
+
+ private void testReplicateXXX(final int numNodes, final String method) throws Throwable {
+
+ // nodes
+ Set<NodeIdentifier> nodeIds = createNodes(numNodes, "localhost", serverPort);
+
+ // set up responses
+ for (int i = 0; i < nodeIds.size(); i++) {
+ HttpResponse response = new HttpResponse(Status.OK, expectedBody);
+ response.addHeaders(expectedResponseHeaders);
+ server.addResponseAction(new HttpResponseAction(response));
+ }
+
+ // setup request parameters
+ server.addCheckedParameters(expectedRequestParameters);
+
+ // request headers
+ server.addCheckedHeaders(expectedRequestHeaders);
+
+ Set<NodeResponse> responses = replicator.replicate(
+ nodeIds,
+ method,
+ prototypeUri,
+ expectedRequestParameters,
+ expectedRequestHeaders);
+
+ Set<NodeIdentifier> returnedNodeIds = new HashSet<>();
+ for (NodeResponse response : responses) {
+
+ // check if we received an exception
+ if (response.hasThrowable()) {
+ throw response.getThrowable();
+ }
+
+ // gather ids to verify later
+ returnedNodeIds.add(response.getNodeId());
+
+ // check status
+ assertEquals(Status.OK.getStatusCode(), response.getStatus());
+
+ Response serverResponse = response.getResponse();
+
+ // check response headers are copied
+ assertTrue(containsHeaders(expectedResponseHeaders, serverResponse.getMetadata()));
+
+ // check entity stream
+ if (HttpMethod.HEAD.equalsIgnoreCase(method)) {
+ assertNull(serverResponse.getEntity());
+ } else {
+ assertTrue(isEquals((StreamingOutput) serverResponse.getEntity(), expectedBody));
+ }
+
+ }
+
+ // check node Ids
+ assertEquals(nodeIds, returnedNodeIds);
+ }
+
+ private void testReplicateXXXEntity(final String method) throws Throwable {
+ testReplicateXXXEntity(executorThreadCount, method);
+ }
+
+ private void testReplicateXXXEntity(final int numNodes, final String method) throws Throwable {
+
+ // nodes
+ Set<NodeIdentifier> nodeIds = createNodes(numNodes, "localhost", serverPort);
+
+ // set up responses
+ for (int i = 0; i < nodeIds.size(); i++) {
+ HttpResponse response = new HttpResponse(Status.OK, expectedBody);
+ response.addHeaders(expectedResponseHeaders);
+ server.addResponseAction(new HttpResponseAction(response));
+ }
+
+ // headers
+ expectedRequestHeaders.put("Content-Type", "application/xml");
+
+ Set<NodeResponse> responses = replicator.replicate(
+ nodeIds,
+ method,
+ prototypeUri,
+ expectedEntity,
+ expectedRequestHeaders);
+
+ Set<NodeIdentifier> returnedNodeIds = new HashSet<>();
+ for (NodeResponse response : responses) {
+
+ // check if we received an exception
+ if (response.hasThrowable()) {
+ throw response.getThrowable();
+ }
+
+ // gather ids to verify later
+ returnedNodeIds.add(response.getNodeId());
+
+ // check status
+ assertEquals(Status.OK.getStatusCode(), response.getStatus());
+
+ Response serverResponse = response.getResponse();
+
+ // check response headers are copied
+ assertTrue(containsHeaders(expectedResponseHeaders, serverResponse.getMetadata()));
+
+ // check entity stream
+ assertTrue(isEquals((StreamingOutput) serverResponse.getEntity(), expectedBody));
+
+ }
+
+ // check node Ids
+ assertEquals(nodeIds, returnedNodeIds);
+ }
+
+ private Set<NodeIdentifier> createNodes(int num, String host, int apiPort) {
+ Set<NodeIdentifier> result = new HashSet<>();
+ for (int i = 0; i < num; i++) {
+ result.add(new NodeIdentifier(String.valueOf(i), host, apiPort, host, 1));
+ }
+ return result;
+ }
+
+ private boolean isEquals(StreamingOutput so, String expectedText) throws IOException {
+ final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ so.write(baos);
+ return expectedText.equals(new String(baos.toByteArray()));
+ }
+
+ private boolean containsHeaders(Map<String, String> expectedHeaders, MultivaluedMap<String, Object> metadata) {
+ for (Map.Entry<String, String> expectedEntry : expectedHeaders.entrySet()) {
+ if (expectedEntry.getValue().equals(metadata.getFirst(expectedEntry.getKey())) == false) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+}
+
+@XmlRootElement
+class Entity {
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/manager/impl/HttpResponseMapperImplTest.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/manager/impl/HttpResponseMapperImplTest.java b/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/manager/impl/HttpResponseMapperImplTest.java
new file mode 100644
index 0000000..d45a4d1
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/manager/impl/HttpResponseMapperImplTest.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.manager.impl;
+
+import org.apache.nifi.cluster.manager.impl.HttpResponseMapperImpl;
+import com.sun.jersey.api.client.ClientResponse;
+import com.sun.jersey.core.util.MultivaluedMapImpl;
+import java.io.ByteArrayInputStream;
+import java.util.Map;
+import java.util.HashSet;
+import java.util.Set;
+import java.net.URI;
+import java.net.URISyntaxException;
+import org.apache.nifi.cluster.manager.NodeResponse;
+import org.apache.nifi.cluster.node.Node;
+import org.apache.nifi.cluster.node.Node.Status;
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
+import org.junit.Before;
+import org.junit.Test;
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
+
+/**
+ * @author unattributed
+ */
+public class HttpResponseMapperImplTest {
+
+ private HttpResponseMapperImpl mapper;
+
+ private URI dummyUri;
+
+ @Before
+ public void setup() throws URISyntaxException {
+ mapper = new HttpResponseMapperImpl();
+ dummyUri = new URI("http://dummy.com");
+ }
+
+ @Test
+ public void testToNodeStatusWithNo2xxResponses() {
+
+ Set<NodeResponse> nodeResponses = new HashSet<>();
+ nodeResponses.add(createNodeResourceResponse("1", 400));
+ nodeResponses.add(createNodeResourceResponse("2", 100));
+ nodeResponses.add(createNodeResourceResponse("3", 300));
+ nodeResponses.add(createNodeResourceResponse("4", 500));
+
+ Map<NodeResponse, Status> map = mapper.map(dummyUri, nodeResponses);
+
+ // since no 2xx responses, any 5xx is disconnected
+ for (Map.Entry<NodeResponse, Status> entry : map.entrySet()) {
+ NodeResponse response = entry.getKey();
+ Status status = entry.getValue();
+ switch (response.getNodeId().getId()) {
+ case "1":
+ assertTrue(status == Node.Status.CONNECTED);
+ break;
+ case "2":
+ assertTrue(status == Node.Status.CONNECTED);
+ break;
+ case "3":
+ assertTrue(status == Node.Status.CONNECTED);
+ break;
+ case "4":
+ assertTrue(status == Node.Status.DISCONNECTED);
+ break;
+ }
+ }
+ }
+
+ @Test
+ public void testToNodeStatusWith2xxResponses() {
+
+ Set<NodeResponse> nodeResponses = new HashSet<>();
+ nodeResponses.add(createNodeResourceResponse("1", 200));
+ nodeResponses.add(createNodeResourceResponse("2", 100));
+ nodeResponses.add(createNodeResourceResponse("3", 300));
+ nodeResponses.add(createNodeResourceResponse("4", 500));
+
+ Map<NodeResponse, Status> map = mapper.map(dummyUri, nodeResponses);
+
+ // since there were 2xx responses, any non-2xx is disconnected
+ for (Map.Entry<NodeResponse, Status> entry : map.entrySet()) {
+ NodeResponse response = entry.getKey();
+ Status status = entry.getValue();
+ switch (response.getNodeId().getId()) {
+ case "1":
+ assertTrue(status == Node.Status.CONNECTED);
+ break;
+ case "2":
+ assertTrue(status == Node.Status.DISCONNECTED);
+ break;
+ case "3":
+ assertTrue(status == Node.Status.DISCONNECTED);
+ break;
+ case "4":
+ assertTrue(status == Node.Status.DISCONNECTED);
+ break;
+ }
+ }
+ }
+
+ private NodeResponse createNodeResourceResponse(String nodeId, int statusCode) {
+
+ ClientResponse clientResponse = mock(ClientResponse.class);
+ when(clientResponse.getStatus()).thenReturn(statusCode);
+ when(clientResponse.getHeaders()).thenReturn(new MultivaluedMapImpl());
+ when(clientResponse.getEntityInputStream()).thenReturn(new ByteArrayInputStream(new byte[0]));
+
+ NodeIdentifier nodeIdentifier = new NodeIdentifier(nodeId, "localhost", 1, "localhost", 1);
+ return new NodeResponse(nodeIdentifier, "GET", dummyUri, clientResponse, 1L, "111");
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/manager/impl/TestWebClusterManager.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/manager/impl/TestWebClusterManager.java b/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/manager/impl/TestWebClusterManager.java
new file mode 100644
index 0000000..7347a94
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/manager/impl/TestWebClusterManager.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.manager.impl;
+
+import org.apache.nifi.cluster.manager.impl.WebClusterManager;
+import static org.junit.Assert.assertEquals;
+
+import java.text.DateFormat;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+
+import org.junit.Test;
+
+public class TestWebClusterManager {
+
+ @Test
+ public void testNormalizedStatusSnapshotDate() throws ParseException {
+ final DateFormat df = new SimpleDateFormat("yyyy/MM/dd HH:mm:SS.SSS");
+ final Date date1 = df.parse("2014/01/01 00:00:00.000");
+ final Date date2 = df.parse("2014/01/01 00:04:59.999");
+ final Date date3 = df.parse("2014/01/01 00:05:00.000");
+ final Date date4 = df.parse("2014/01/01 00:05:00.001");
+
+ final Date normalized1 = WebClusterManager.normalizeStatusSnapshotDate(date1, 300000);
+ assertEquals(date1, normalized1);
+
+ final Date normalized2 = WebClusterManager.normalizeStatusSnapshotDate(date2, 300000);
+ assertEquals(date1, normalized2);
+
+ final Date normalized3 = WebClusterManager.normalizeStatusSnapshotDate(date3, 300000);
+ assertEquals(date3, normalized3);
+
+ final Date normalized4 = WebClusterManager.normalizeStatusSnapshotDate(date4, 300000);
+ assertEquals(date3, normalized4);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/manager/testutils/HttpRequest.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/manager/testutils/HttpRequest.java b/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/manager/testutils/HttpRequest.java
new file mode 100644
index 0000000..35380dd
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/manager/testutils/HttpRequest.java
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.manager.testutils;
+
+import java.io.IOException;
+import java.io.Reader;
+import java.io.UnsupportedEncodingException;
+import java.net.URLDecoder;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import javax.ws.rs.HttpMethod;
+import javax.ws.rs.core.MediaType;
+import org.apache.commons.lang3.StringUtils;
+
+/**
+ * Encapsulates an HTTP request. The toString method returns the
+ * specification-compliant request.
+ *
+ * @author unattributed
+ */
+public class HttpRequest {
+
+ private String method;
+ private String uri;
+ private String rawUri;
+ private String version;
+ private String body;
+ private String rawRequest;
+ private Map<String, String> headers = new HashMap<>();
+ private Map<String, List<String>> parameters = new HashMap<>();
+
+ public static HttpRequestBuilder createFromRequestLine(final String requestLine) {
+ return new HttpRequestBuilder(requestLine);
+ }
+
+ public String getBody() {
+ return body;
+ }
+
+ public Map<String, String> getHeaders() {
+ return Collections.unmodifiableMap(headers);
+ }
+
+ public String getHeaderValue(final String header) {
+ for (final Map.Entry<String, String> entry : getHeaders().entrySet()) {
+ if (entry.getKey().equalsIgnoreCase(header)) {
+ return entry.getValue();
+ }
+ }
+ return null;
+ }
+
+ public String getMethod() {
+ return method;
+ }
+
+ public Map<String, List<String>> getParameters() {
+ final Map<String, List<String>> result = new HashMap<>();
+ for (final Map.Entry<String, List<String>> entry : parameters.entrySet()) {
+ result.put(entry.getKey(), Collections.unmodifiableList(entry.getValue()));
+ }
+ return Collections.unmodifiableMap(result);
+ }
+
+ public String getUri() {
+ return uri;
+ }
+
+ public String getRawUri() {
+ return rawUri;
+ }
+
+ public String getVersion() {
+ return version;
+ }
+
+ @Override
+ public String toString() {
+ return rawRequest;
+ }
+
+ /**
+ * A builder for constructing basic HTTP requests. It handles only enough of
+ * the HTTP specification to support basic unit testing, and it should not
+ * be used otherwise.
+ */
+ public static class HttpRequestBuilder {
+
+ private String method;
+ private String uri;
+ private String rawUri;
+ private String version;
+ private Map<String, String> headers = new HashMap<>();
+ private Map<String, List<String>> parameters = new HashMap<>();
+ private int contentLength = 0;
+ private String contentType;
+ private String body = "";
+ private StringBuilder rawRequest = new StringBuilder();
+
+ private HttpRequestBuilder(final String requestLine) {
+
+ final String[] tokens = requestLine.split(" ");
+ if (tokens.length != 3) {
+ throw new IllegalArgumentException("Invalid HTTP Request Line: " + requestLine);
+ }
+
+ method = tokens[0];
+ if (HttpMethod.GET.equalsIgnoreCase(method) || HttpMethod.HEAD.equalsIgnoreCase(method) || HttpMethod.DELETE.equalsIgnoreCase(method) || HttpMethod.OPTIONS.equalsIgnoreCase(method)) {
+ final int queryIndex = tokens[1].indexOf("?");
+ if (queryIndex > -1) {
+ uri = tokens[1].substring(0, queryIndex);
+ addParameters(tokens[1].substring(queryIndex + 1));
+ } else {
+ uri = tokens[1];
+ }
+ }
+ rawUri = tokens[1];
+ version = tokens[2];
+ rawRequest.append(requestLine).append("\n");
+ }
+
+ private void addHeader(final String key, final String value) {
+ if (key.contains(" ")) {
+ throw new IllegalArgumentException("Header key may not contain spaces.");
+ } else if ("content-length".equalsIgnoreCase(key)) {
+ contentLength = (StringUtils.isBlank(value.trim())) ? 0 : Integer.parseInt(value.trim());
+ } else if ("content-type".equalsIgnoreCase(key)) {
+ contentType = value.trim();
+ }
+ headers.put(key, value);
+ }
+
+ public void addHeader(final String header) {
+ final int firstColonIndex = header.indexOf(":");
+ if (firstColonIndex < 0) {
+ throw new IllegalArgumentException("Invalid HTTP Header line: " + header);
+ }
+ addHeader(header.substring(0, firstColonIndex), header.substring(firstColonIndex + 1));
+ rawRequest.append(header).append("\n");
+ }
+
+ // final because constructor calls it
+ public final void addParameters(final String queryString) {
+
+ if (StringUtils.isBlank(queryString)) {
+ return;
+ }
+
+ final String normQueryString;
+ if (queryString.startsWith("?")) {
+ normQueryString = queryString.substring(1);
+ } else {
+ normQueryString = queryString;
+ }
+ final String[] keyValuePairs = normQueryString.split("&");
+ for (final String keyValuePair : keyValuePairs) {
+ final String[] keyValueTokens = keyValuePair.split("=");
+ try {
+ addParameter(
+ URLDecoder.decode(keyValueTokens[0], "utf-8"),
+ URLDecoder.decode(keyValueTokens[1], "utf-8")
+ );
+ } catch (UnsupportedEncodingException use) {
+ throw new RuntimeException(use);
+ }
+ }
+ }
+
+ public void addParameter(final String key, final String value) {
+
+ if (key.contains(" ")) {
+ throw new IllegalArgumentException("Parameter key may not contain spaces: " + key);
+ }
+
+ final List<String> values;
+ if (parameters.containsKey(key)) {
+ values = parameters.get(key);
+ } else {
+ values = new ArrayList<>();
+ parameters.put(key, values);
+ }
+ values.add(value);
+ }
+
+ public void addBody(final Reader reader) throws IOException {
+
+ if (contentLength <= 0) {
+ return;
+ }
+
+ final char[] buf = new char[contentLength];
+ int offset = 0;
+ int numRead = 0;
+ while (offset < buf.length && (numRead = reader.read(buf, offset, buf.length - offset)) >= 0) {
+ offset += numRead;
+ }
+ body = new String(buf);
+ rawRequest.append("\n");
+ rawRequest.append(body);
+ }
+
+ public HttpRequest build() throws UnsupportedEncodingException {
+
+ if (HttpMethod.GET.equalsIgnoreCase(method) == false && HttpMethod.HEAD.equalsIgnoreCase(method) == false && contentType.equalsIgnoreCase(MediaType.APPLICATION_FORM_URLENCODED)) {
+ addParameters(body);
+ }
+
+ final HttpRequest request = new HttpRequest();
+ request.method = this.method;
+ request.uri = this.uri;
+ request.rawUri = this.rawUri;
+ request.version = this.version;
+ request.headers.putAll(this.headers);
+ request.parameters.putAll(this.parameters);
+ request.body = this.body;
+ request.rawRequest = this.rawRequest.toString();
+
+ return request;
+ }
+
+ }
+}