You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2014/12/08 21:29:34 UTC

[11/51] [partial] incubator-nifi git commit: Initial code contribution

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/node/Node.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/node/Node.java b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/node/Node.java
new file mode 100644
index 0000000..84565da
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/node/Node.java
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.node;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.nifi.cluster.HeartbeatPayload;
+import org.apache.nifi.cluster.protocol.Heartbeat;
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
+import org.apache.nifi.cluster.protocol.ProtocolException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Represents a connected flow controller. Nodes always have an immutable
+ * identifier and a status. The status may be changed, but never null.
+ *
+ * A Node may be cloned, but the cloning is a shallow copy of the instance.
+ *
+ * This class overrides hashCode and equals and considers two instances to be
+ * equal if they have the equal NodeIdentifiers.
+ *
+ * @author unattributed
+ */
+public class Node implements Cloneable, Comparable<Node> {
+
+    private static final Logger lockLogger = LoggerFactory.getLogger("cluster.lock");
+
+    /**
+     * The semantics of a Node status are as follows:
+     * <ul>
+     * <li>CONNECTED -- a flow controller that is connected to the cluster. A
+     * connecting node transitions to connected after the cluster receives the
+     * flow controller's first heartbeat. A connected node can transition to
+     * disconnecting.</li>
+     * <li>CONNECTING -- a flow controller has issued a connection request to
+     * the cluster, but has not yet sent a heartbeat. A connecting node can
+     * transition to disconnecting or connected. The cluster will not accept any
+     * external requests to change the flow while any node is connecting.</li>
+     * <li>DISCONNECTED -- a flow controller that is not connected to the
+     * cluster. A disconnected node can transition to connecting.</li>
+     * <li>DISCONNECTING -- a flow controller that is in the process of
+     * disconnecting from the cluster. A disconnecting node will always
+     * transition to disconnected.</li>
+     * </ul>
+     */
+    public static enum Status {
+
+        CONNECTED,
+        CONNECTING,
+        DISCONNECTED,
+        DISCONNECTING
+    }
+
+    /**
+     * the node's unique identifier
+     */
+    private final NodeIdentifier nodeId;
+
+    /**
+     * the node statue
+     */
+    private Status status;
+
+    /**
+     * the last heartbeat received by from the node
+     */
+    private Heartbeat lastHeartbeat;
+
+    /**
+     * the payload of the last heartbeat received from the node
+     */
+    private HeartbeatPayload lastHeartbeatPayload;
+
+    /**
+     * the last time the connection for this node was requested
+     */
+    private AtomicLong connectionRequestedTimestamp = new AtomicLong(0L);
+
+    /**
+     * a flag to indicate this node was disconnected because of a lack of
+     * heartbeat
+     */
+    private boolean heartbeatDisconnection;
+
+    public Node(final NodeIdentifier id, final Status status) {
+        if (id == null) {
+            throw new IllegalArgumentException("ID may not be null.");
+        } else if (status == null) {
+            throw new IllegalArgumentException("Status may not be null.");
+        }
+        this.nodeId = id;
+        this.status = status;
+    }
+
+    public NodeIdentifier getNodeId() {
+        return nodeId;
+    }
+
+    /**
+     * Returns the last received heartbeat or null if no heartbeat has been set.
+     *
+     * @return a heartbeat or null
+     */
+    public Heartbeat getHeartbeat() {
+        return lastHeartbeat;
+    }
+
+    public HeartbeatPayload getHeartbeatPayload() {
+        return lastHeartbeatPayload;
+    }
+
+    /**
+     * Sets the last heartbeat received.
+     *
+     * @param heartbeat a heartbeat
+     *
+     * @throws ProtocolException if the heartbeat's payload failed unmarshalling
+     */
+    public void setHeartbeat(final Heartbeat heartbeat) throws ProtocolException {
+        this.lastHeartbeat = heartbeat;
+        if (this.lastHeartbeat == null) {
+            this.lastHeartbeatPayload = null;
+        } else {
+            final byte[] payload = lastHeartbeat.getPayload();
+            if (payload == null || payload.length == 0) {
+                this.lastHeartbeatPayload = null;
+            } else {
+                this.lastHeartbeatPayload = HeartbeatPayload.unmarshal(payload);
+            }
+        }
+    }
+
+    /**
+     * Returns the time of the last received connection request for this node.
+     *
+     * @return the time when the connection request for this node was received.
+     */
+    public long getConnectionRequestedTimestamp() {
+        return connectionRequestedTimestamp.get();
+    }
+
+    /**
+     * Sets the time when the connection request for this node was last
+     * received.
+     *
+     * This method is thread-safe and may be called without obtaining any lock.
+     *
+     * @param connectionRequestedTimestamp
+     */
+    public void setConnectionRequestedTimestamp(long connectionRequestedTimestamp) {
+        this.connectionRequestedTimestamp.set(connectionRequestedTimestamp);
+    }
+
+    /**
+     * Returns true if the node was disconnected due to lack of heartbeat; false
+     * otherwise.
+     *
+     * @return true if the node was disconnected due to lack of heartbeat; false
+     * otherwise.
+     */
+    public boolean isHeartbeatDisconnection() {
+        return heartbeatDisconnection;
+    }
+
+    /**
+     * Sets the status to disconnected and flags the node as being disconnected
+     * by lack of heartbeat.
+     */
+    public void setHeartbeatDisconnection() {
+        setStatus(Status.DISCONNECTED);
+        heartbeatDisconnection = true;
+    }
+
+    /**
+     * @return the status
+     */
+    public Status getStatus() {
+        return status;
+    }
+
+    /**
+     * @param status a status
+     */
+    public void setStatus(final Status status) {
+        if (status == null) {
+            throw new IllegalArgumentException("Status may not be null.");
+        }
+        this.status = status;
+        heartbeatDisconnection = false;
+    }
+
+    @Override
+    public Node clone() {
+        final Node clone = new Node(nodeId, status);
+        clone.lastHeartbeat = lastHeartbeat;
+        clone.lastHeartbeatPayload = lastHeartbeatPayload;
+        clone.heartbeatDisconnection = heartbeatDisconnection;
+        clone.connectionRequestedTimestamp = connectionRequestedTimestamp;
+        return clone;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (obj == null) {
+            return false;
+        }
+        if (getClass() != obj.getClass()) {
+            return false;
+        }
+        final Node other = (Node) obj;
+        if (this.nodeId != other.nodeId && (this.nodeId == null || !this.nodeId.equals(other.nodeId))) {
+            return false;
+        }
+        return true;
+    }
+
+    @Override
+    public int hashCode() {
+        int hash = 7;
+        hash = 53 * hash + (this.nodeId != null ? this.nodeId.hashCode() : 0);
+        return hash;
+    }
+
+    @Override
+    public String toString() {
+        return nodeId.toString();
+    }
+
+    @Override
+    public int compareTo(final Node o) {
+        if (o == null) {
+            return -1;
+        }
+        return getNodeId().getId().compareTo(o.getNodeId().getId());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/spring/ClusterManagerProtocolServiceLocatorFactoryBean.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/spring/ClusterManagerProtocolServiceLocatorFactoryBean.java b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/spring/ClusterManagerProtocolServiceLocatorFactoryBean.java
new file mode 100644
index 0000000..e26d196
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/spring/ClusterManagerProtocolServiceLocatorFactoryBean.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.spring;
+
+import java.net.InetSocketAddress;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.nifi.cluster.protocol.impl.ClusterServiceDiscovery;
+import org.apache.nifi.cluster.protocol.impl.ClusterServiceLocator;
+import org.apache.nifi.io.socket.multicast.DiscoverableService;
+import org.apache.nifi.io.socket.multicast.DiscoverableServiceImpl;
+import org.apache.nifi.util.FormatUtils;
+import org.apache.nifi.util.NiFiProperties;
+
+import org.springframework.beans.BeansException;
+import org.springframework.beans.factory.DisposableBean;
+import org.springframework.beans.factory.FactoryBean;
+import org.springframework.context.ApplicationContext;
+import org.springframework.context.ApplicationContextAware;
+
+/**
+ * Factory bean for creating a singleton ClusterManagerProtocolServiceLocator
+ * instance. If the application is configured to act as the cluster manager,
+ * then null is always returned as the created instance.
+ *
+ * The cluster manager protocol service represents the socket endpoint for
+ * sending internal socket messages to the cluster manager.
+ */
+public class ClusterManagerProtocolServiceLocatorFactoryBean implements FactoryBean, ApplicationContextAware, DisposableBean {
+
+    private ApplicationContext applicationContext;
+
+    private ClusterServiceLocator locator;
+
+    private NiFiProperties properties;
+
+    @Override
+    public Object getObject() throws Exception {
+        /*
+         * If configured for the cluster manager, then the service locator is never used.  
+         */
+        if (properties.isClusterManager()) {
+            return null;
+        } else if (locator == null) {
+
+            if (properties.getClusterProtocolUseMulticast()) {
+
+                // get the service discovery instance
+                final ClusterServiceDiscovery serviceDiscovery = applicationContext.getBean("clusterManagerProtocolServiceDiscovery", ClusterServiceDiscovery.class);
+
+                // create service location configuration
+                final ClusterServiceLocator.AttemptsConfig config = new ClusterServiceLocator.AttemptsConfig();
+                config.setNumAttempts(properties.getClusterProtocolMulticastServiceLocatorAttempts());
+
+                final int delay = (int) FormatUtils.getTimeDuration(properties.getClusterProtocolMulticastServiceLocatorAttemptsDelay(), TimeUnit.SECONDS);
+                config.setTimeBetweenAttempts(delay);
+                config.setTimeBetweenAttempsUnit(TimeUnit.SECONDS);
+
+                locator = new ClusterServiceLocator(serviceDiscovery);
+                locator.setAttemptsConfig(config);
+
+            } else {
+                final String serviceName = applicationContext.getBean("clusterManagerProtocolServiceName", String.class);
+                final InetSocketAddress serviceAddress = properties.getClusterNodeUnicastManagerProtocolAddress();
+                final DiscoverableService service = new DiscoverableServiceImpl(serviceName, serviceAddress);
+                locator = new ClusterServiceLocator(service);
+            }
+
+            // start the locator
+            locator.start();
+
+        }
+        return locator;
+
+    }
+
+    @Override
+    public Class getObjectType() {
+        return ClusterServiceLocator.class;
+    }
+
+    @Override
+    public boolean isSingleton() {
+        return true;
+    }
+
+    @Override
+    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
+        this.applicationContext = applicationContext;
+    }
+
+    @Override
+    public void destroy() throws Exception {
+        if (locator != null && locator.isRunning()) {
+            locator.stop();
+        }
+    }
+
+    public void setProperties(NiFiProperties properties) {
+        this.properties = properties;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/spring/FileBasedClusterNodeFirewallFactoryBean.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/spring/FileBasedClusterNodeFirewallFactoryBean.java b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/spring/FileBasedClusterNodeFirewallFactoryBean.java
new file mode 100644
index 0000000..ef72298
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/spring/FileBasedClusterNodeFirewallFactoryBean.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.spring;
+
+import java.io.File;
+import org.apache.nifi.cluster.firewall.impl.FileBasedClusterNodeFirewall;
+import org.apache.nifi.util.NiFiProperties;
+import org.springframework.beans.factory.FactoryBean;
+
+/**
+ * Factory bean for creating a singleton FileBasedClusterNodeFirewall instance.
+ */
+public class FileBasedClusterNodeFirewallFactoryBean implements FactoryBean {
+
+    private FileBasedClusterNodeFirewall firewall;
+
+    private NiFiProperties properties;
+
+    @Override
+    public Object getObject() throws Exception {
+        if (firewall == null) {
+            final File config = properties.getClusterManagerNodeFirewallFile();
+            final File restoreDirectory = properties.getRestoreDirectory();
+            if (config != null) {
+                firewall = new FileBasedClusterNodeFirewall(config, restoreDirectory);
+            }
+        }
+        return firewall;
+    }
+
+    @Override
+    public Class getObjectType() {
+        return FileBasedClusterNodeFirewall.class;
+    }
+
+    @Override
+    public boolean isSingleton() {
+        return true;
+    }
+
+    public void setProperties(NiFiProperties properties) {
+        this.properties = properties;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/spring/WebClusterManagerFactoryBean.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/spring/WebClusterManagerFactoryBean.java b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/spring/WebClusterManagerFactoryBean.java
new file mode 100644
index 0000000..7169730
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/spring/WebClusterManagerFactoryBean.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.spring;
+
+import java.nio.file.Paths;
+import org.apache.nifi.admin.service.AuditService;
+import org.apache.nifi.cluster.event.EventManager;
+import org.apache.nifi.cluster.firewall.ClusterNodeFirewall;
+import org.apache.nifi.cluster.flow.DataFlowManagementService;
+import org.apache.nifi.cluster.manager.HttpRequestReplicator;
+import org.apache.nifi.cluster.manager.HttpResponseMapper;
+import org.apache.nifi.cluster.manager.impl.WebClusterManager;
+import org.apache.nifi.cluster.protocol.impl.ClusterManagerProtocolSenderListener;
+import org.apache.nifi.cluster.protocol.impl.ClusterServicesBroadcaster;
+import org.apache.nifi.controller.service.ControllerServiceLoader;
+import org.apache.nifi.encrypt.StringEncryptor;
+import org.apache.nifi.io.socket.multicast.DiscoverableService;
+import org.apache.nifi.io.socket.multicast.DiscoverableServiceImpl;
+import org.apache.nifi.util.NiFiProperties;
+import org.springframework.beans.BeansException;
+import org.springframework.beans.factory.FactoryBean;
+import org.springframework.context.ApplicationContext;
+import org.springframework.context.ApplicationContextAware;
+
+/**
+ * Factory bean for creating a singleton WebClusterManager instance. If the
+ * application is not configured to act as the cluster manager, then null is
+ * always returned as the created instance.
+ */
+public class WebClusterManagerFactoryBean implements FactoryBean, ApplicationContextAware {
+
+    private ApplicationContext applicationContext;
+
+    private WebClusterManager clusterManager;
+
+    private NiFiProperties properties;
+
+    private StringEncryptor encryptor;
+
+    @Override
+    public Object getObject() throws Exception {
+        if (properties.isClusterManager() && properties.isNode()) {
+            throw new IllegalStateException("Application may be configured as a cluster manager or a node, but not both.");
+        } else if (!properties.isClusterManager()) {
+            /*
+             * If not configured for the cluster manager, then the cluster manager is never used.  
+             * null is returned so that we don't instantiate a thread pool or other resources. 
+             */
+            return null;
+        } else if (clusterManager == null) {
+
+            // get the service configuration path (fail early)
+            final String serviceConfigurationFile = properties.getProperty(NiFiProperties.SERVICE_CONFIGURATION_FILE);
+            if (serviceConfigurationFile == null) {
+                throw new NullPointerException("The service configuration file has not been specified.");
+            }
+
+            final HttpRequestReplicator requestReplicator = applicationContext.getBean("httpRequestReplicator", HttpRequestReplicator.class);
+            final HttpResponseMapper responseMapper = applicationContext.getBean("httpResponseMapper", HttpResponseMapper.class);
+            final DataFlowManagementService dataFlowService = applicationContext.getBean("dataFlowManagementService", DataFlowManagementService.class);
+            final ClusterManagerProtocolSenderListener senderListener = applicationContext.getBean("clusterManagerProtocolSenderListener", ClusterManagerProtocolSenderListener.class);
+
+            // create the manager
+            clusterManager = new WebClusterManager(
+                    requestReplicator,
+                    responseMapper,
+                    dataFlowService,
+                    senderListener,
+                    properties,
+                    encryptor
+            );
+
+            // set the service broadcaster
+            if (properties.getClusterProtocolUseMulticast()) {
+
+                // create broadcaster
+                final ClusterServicesBroadcaster broadcaster = applicationContext.getBean("clusterServicesBroadcaster", ClusterServicesBroadcaster.class);
+
+                // register the cluster manager protocol service
+                final String clusterManagerProtocolServiceName = applicationContext.getBean("clusterManagerProtocolServiceName", String.class);
+                final DiscoverableService clusterManagerProtocolService = new DiscoverableServiceImpl(clusterManagerProtocolServiceName, properties.getClusterManagerProtocolAddress());
+                broadcaster.addService(clusterManagerProtocolService);
+
+                clusterManager.setServicesBroadcaster(broadcaster);
+            }
+
+            // set the event manager
+            clusterManager.setEventManager(applicationContext.getBean("nodeEventHistoryManager", EventManager.class));
+
+            // set the cluster firewall
+            clusterManager.setClusterFirewall(applicationContext.getBean("clusterFirewall", ClusterNodeFirewall.class));
+
+            // set the audit service
+            clusterManager.setAuditService(applicationContext.getBean("auditService", AuditService.class));
+
+            // load the controller services
+            final ControllerServiceLoader serviceLoader = new ControllerServiceLoader(Paths.get(serviceConfigurationFile));
+            serviceLoader.loadControllerServices(clusterManager);
+        }
+        return clusterManager;
+    }
+
+    @Override
+    public Class getObjectType() {
+        return WebClusterManager.class;
+    }
+
+    @Override
+    public boolean isSingleton() {
+        return true;
+    }
+
+    @Override
+    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
+        this.applicationContext = applicationContext;
+    }
+
+    public void setProperties(NiFiProperties properties) {
+        this.properties = properties;
+    }
+
+    public void setEncryptor(final StringEncryptor encryptor) {
+        this.encryptor = encryptor;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/controller/reporting/ClusteredReportingTaskNode.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/controller/reporting/ClusteredReportingTaskNode.java b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/controller/reporting/ClusteredReportingTaskNode.java
new file mode 100644
index 0000000..1ed5b30
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/controller/reporting/ClusteredReportingTaskNode.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.reporting;
+
+import org.apache.nifi.cluster.manager.impl.ClusteredReportingContext;
+import org.apache.nifi.controller.ProcessScheduler;
+import org.apache.nifi.controller.ValidationContextFactory;
+import org.apache.nifi.controller.service.ControllerServiceProvider;
+import org.apache.nifi.reporting.BulletinRepository;
+import org.apache.nifi.reporting.EventAccess;
+import org.apache.nifi.reporting.ReportingContext;
+import org.apache.nifi.reporting.ReportingTask;
+
+public class ClusteredReportingTaskNode extends AbstractReportingTaskNode {
+
+    private final EventAccess eventAccess;
+    private final BulletinRepository bulletinRepository;
+    private final ControllerServiceProvider serviceProvider;
+
+    public ClusteredReportingTaskNode(final ReportingTask reportingTask, final String id, final ProcessScheduler scheduler,
+            final EventAccess eventAccess, final BulletinRepository bulletinRepository, final ControllerServiceProvider serviceProvider,
+            final ValidationContextFactory validationContextFactory) {
+        super(reportingTask, id, serviceProvider, scheduler, validationContextFactory);
+
+        this.eventAccess = eventAccess;
+        this.bulletinRepository = bulletinRepository;
+        this.serviceProvider = serviceProvider;
+    }
+
+    @Override
+    public ReportingContext getReportingContext() {
+        return new ClusteredReportingContext(eventAccess, bulletinRepository, getProperties(), serviceProvider);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster/src/main/resources/nifi-cluster-manager-context.xml
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/cluster/src/main/resources/nifi-cluster-manager-context.xml b/nar-bundles/framework-bundle/framework/cluster/src/main/resources/nifi-cluster-manager-context.xml
new file mode 100644
index 0000000..68c29bc
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/cluster/src/main/resources/nifi-cluster-manager-context.xml
@@ -0,0 +1,124 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+      http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<!-- marked as lazy so that clustering beans are not created when applications runs in non-clustered mode -->
+<beans default-lazy-init="true"
+       xmlns="http://www.springframework.org/schema/beans"
+       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+       xmlns:util="http://www.springframework.org/schema/util"
+       xmlns:context="http://www.springframework.org/schema/context"
+       xmlns:aop="http://www.springframework.org/schema/aop"
+       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.1.xsd
+    http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-3.1.xsd
+    http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.1.xsd
+    http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.1.xsd">
+
+    <!-- jersey client -->
+    <bean id="jersey-client" class="org.apache.nifi.web.util.WebUtils" factory-method="createClient">
+        <constructor-arg>
+            <bean class="com.sun.jersey.api.client.config.DefaultClientConfig"/>
+        </constructor-arg>
+        <constructor-arg>
+            <bean class="org.apache.nifi.framework.security.util.SslContextFactory" factory-method="createSslContext">
+                <constructor-arg ref="nifiProperties"/>
+            </bean>
+        </constructor-arg>
+    </bean>
+
+    <!-- http request replicator -->
+    <bean id="httpRequestReplicator" class="org.apache.nifi.cluster.manager.impl.HttpRequestReplicatorImpl">
+        <constructor-arg index="0">
+            <bean factory-bean="nifiProperties" factory-method="getClusterManagerNodeApiRequestThreads"/>
+        </constructor-arg>
+        <constructor-arg ref="jersey-client" index="1"/>
+        <constructor-arg index="2">
+            <bean factory-bean="nifiProperties" factory-method="getClusterManagerNodeApiConnectionTimeout"/>
+        </constructor-arg>
+        <constructor-arg index="3">
+            <bean factory-bean="nifiProperties" factory-method="getClusterManagerNodeApiReadTimeout"/>
+        </constructor-arg>
+        <property name="nodeProtocolScheme">
+            <bean factory-bean="nifiProperties" factory-method="getClusterProtocolManagerToNodeApiScheme"/>
+        </property>
+    </bean>
+    
+    <!-- http response mapper -->
+    <bean id="httpResponseMapper" class="org.apache.nifi.cluster.manager.impl.HttpResponseMapperImpl"/>
+
+    <!-- cluster flow DAO -->
+    <bean id="dataFlowDao" class="org.apache.nifi.cluster.flow.impl.DataFlowDaoImpl">
+        <constructor-arg index="0">
+            <bean factory-bean="nifiProperties" factory-method="getFlowConfigurationFileDir"/>
+        </constructor-arg>
+        <constructor-arg index="1">
+            <bean factory-bean="nifiProperties" factory-method="getRestoreDirectory"/>
+        </constructor-arg>
+        <constructor-arg index="2">
+            <bean factory-bean="nifiProperties" factory-method="getAutoResumeState"/>
+        </constructor-arg>
+    </bean>
+    
+    <!-- dataflow management service -->
+    <bean id="dataFlowManagementService" class="org.apache.nifi.cluster.flow.impl.DataFlowManagementServiceImpl">
+        <constructor-arg ref="dataFlowDao"/>
+        <constructor-arg ref="clusterManagerProtocolSender"/>
+        <property name="retrievalDelay">
+            <bean factory-bean="nifiProperties" factory-method="getClusterManagerFlowRetrievalDelay"/>
+        </property>
+    </bean>
+
+    <!-- node event history manager -->
+    <bean id="nodeEventHistoryManager" class="org.apache.nifi.cluster.event.impl.EventManagerImpl">
+        <constructor-arg>
+            <bean factory-bean="nifiProperties" factory-method="getClusterManagerNodeEventHistorySize"/>
+        </constructor-arg>
+    </bean>
+
+    <!-- cluster firewall -->
+    <bean id="clusterFirewall" class="org.apache.nifi.cluster.spring.FileBasedClusterNodeFirewallFactoryBean">
+        <property name="properties" ref="nifiProperties"/>
+    </bean>
+
+    <!-- cluster manager -->
+    <bean id="clusterManager" class="org.apache.nifi.cluster.spring.WebClusterManagerFactoryBean">
+        <property name="properties" ref="nifiProperties"/>
+        <property name="encryptor" ref="stringEncryptor"/>
+    </bean>
+    
+    <!-- discoverable services -->
+    
+    <!-- cluster manager protocol discoverable service -->
+    
+    <!-- service name for communicating with the cluster manager using sockets -->
+    <bean id="clusterManagerProtocolServiceName" class="java.lang.String">
+        <constructor-arg value="cluster-manager-protocol" />
+    </bean>
+    
+    <!-- cluster manager protocol service discovery -->
+    <bean id="clusterManagerProtocolServiceDiscovery" class="org.apache.nifi.cluster.protocol.impl.ClusterServiceDiscovery">
+        <constructor-arg ref="clusterManagerProtocolServiceName" index="0"/>
+        <constructor-arg index="1">
+            <bean factory-bean="nifiProperties" factory-method="getClusterProtocolMulticastAddress"/>
+        </constructor-arg>
+        <constructor-arg ref="protocolMulticastConfiguration" index="2"/>
+        <constructor-arg ref="protocolContext" index="3"/>
+    </bean>
+    
+    <!-- cluster manager protocol service locator -->
+    <bean id="clusterManagerProtocolServiceLocator" class="org.apache.nifi.cluster.spring.ClusterManagerProtocolServiceLocatorFactoryBean">
+        <property name="properties" ref="nifiProperties"/>
+    </bean>
+        
+</beans>

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/event/impl/EventManagerImplTest.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/event/impl/EventManagerImplTest.java b/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/event/impl/EventManagerImplTest.java
new file mode 100644
index 0000000..09ea44b
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/event/impl/EventManagerImplTest.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.event.impl;
+
+import org.apache.nifi.cluster.event.impl.EventManagerImpl;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import org.apache.nifi.cluster.event.Event;
+import org.apache.nifi.cluster.event.Event.Category;
+import org.apache.nifi.cluster.event.EventManager;
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+/**
+ * @author unattributed
+ */
+public class EventManagerImplTest {
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testNonPositiveHistorySize() {
+        new EventManagerImpl(0);
+    }
+
+    @Test
+    public void testGetEventsUnknownSource() {
+        EventManager manager = new EventManagerImpl(1);
+        assertEquals(Collections.EMPTY_LIST, manager.getEvents("unknown value"));
+    }
+
+    @Test
+    public void testGetEvents() {
+
+        EventManager manager = new EventManagerImpl(2);
+
+        Event e1 = new Event("1", "Event1", Category.INFO, 0);
+        Event e2 = new Event("1", "Event2", Category.INFO, 1);
+
+        manager.addEvent(e1);
+        manager.addEvent(e2);
+
+        List<Event> events = manager.getEvents("1");
+
+        // assert newest to oldest
+        assertEquals(Arrays.asList(e2, e1), events);
+    }
+
+    @Test
+    public void testGetMostRecentEventUnknownSource() {
+        EventManager manager = new EventManagerImpl(1);
+        assertNull(manager.getMostRecentEvent("unknown value"));
+    }
+
+    @Test
+    public void testGetMostRecentEvent() {
+
+        EventManager manager = new EventManagerImpl(2);
+
+        Event e1 = new Event("1", "Event1", Category.INFO, 0);
+        Event e2 = new Event("1", "Event2", Category.INFO, 1);
+
+        manager.addEvent(e1);
+        manager.addEvent(e2);
+
+        // assert newest to oldest
+        assertEquals(e2, manager.getMostRecentEvent("1"));
+    }
+
+    @Test
+    public void testAddEventExceedsHistorySize() {
+
+        EventManager manager = new EventManagerImpl(1);
+
+        Event e1 = new Event("1", "Event1", Category.INFO, 0);
+        Event e2 = new Event("1", "Event2", Category.INFO, 1);
+
+        manager.addEvent(e1);
+        manager.addEvent(e2);
+
+        List<Event> events = manager.getEvents("1");
+
+        // assert oldest evicted
+        assertEquals(Arrays.asList(e2), events);
+
+    }
+
+    @Test
+    public void testClearHistory() {
+
+        EventManager manager = new EventManagerImpl(1);
+
+        Event e1 = new Event("1", "Event1", Category.INFO, 0);
+        Event e2 = new Event("1", "Event2", Category.INFO, 1);
+
+        manager.addEvent(e1);
+        manager.addEvent(e2);
+
+        manager.clearEventHistory("1");
+
+        // assert oldest evicted
+        assertTrue(manager.getEvents("1").isEmpty());
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/firewall/impl/FileBasedClusterNodeFirewallTest.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/firewall/impl/FileBasedClusterNodeFirewallTest.java b/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/firewall/impl/FileBasedClusterNodeFirewallTest.java
new file mode 100644
index 0000000..2fcf7ef
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/firewall/impl/FileBasedClusterNodeFirewallTest.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.firewall.impl;
+
+import org.apache.nifi.cluster.firewall.impl.FileBasedClusterNodeFirewall;
+import java.io.File;
+import java.io.IOException;
+import org.apache.nifi.file.FileUtils;
+import org.junit.After;
+import org.junit.Before;
+import static org.junit.Assert.*;
+import org.junit.Test;
+
+public class FileBasedClusterNodeFirewallTest {
+
+    private FileBasedClusterNodeFirewall ipsFirewall;
+
+    private FileBasedClusterNodeFirewall acceptAllFirewall;
+
+    private File ipsConfig;
+
+    private File emptyConfig;
+
+    private File restoreDirectory;
+
+    @Before
+    public void setup() throws Exception {
+
+        ipsConfig = new File("src/test/resources/org/apache/nifi/cluster/firewall/impl/ips.txt");
+        emptyConfig = new File("src/test/resources/org/apache/nifi/cluster/firewall/impl/empty.txt");
+
+        restoreDirectory = new File(System.getProperty("java.io.tmpdir") + "/firewall_restore");
+
+        ipsFirewall = new FileBasedClusterNodeFirewall(ipsConfig, restoreDirectory);
+        acceptAllFirewall = new FileBasedClusterNodeFirewall(emptyConfig);
+    }
+
+    @After
+    public void teardown() throws IOException {
+        deleteFile(restoreDirectory);
+    }
+
+    @Test
+    public void testSyncWithRestore() {
+        assertEquals(ipsConfig.length(), new File(restoreDirectory, ipsConfig.getName()).length());
+    }
+
+    @Test
+    public void testIsPermissibleWithExactMatch() {
+        assertTrue(ipsFirewall.isPermissible("2.2.2.2"));
+    }
+
+    @Test
+    public void testIsPermissibleWithSubnetMatch() {
+        assertTrue(ipsFirewall.isPermissible("3.3.3.255"));
+    }
+
+    @Test
+    public void testIsPermissibleWithNoMatch() {
+        assertFalse(ipsFirewall.isPermissible("255.255.255.255"));
+    }
+
+    @Test
+    public void testIsPermissibleWithMalformedData() {
+        assertFalse(ipsFirewall.isPermissible("abc"));
+    }
+
+    @Test
+    public void testIsPermissibleWithEmptyConfig() {
+        assertTrue(acceptAllFirewall.isPermissible("1.1.1.1"));
+    }
+
+    @Test
+    public void testIsPermissibleWithEmptyConfigWithMalformedData() {
+        assertTrue(acceptAllFirewall.isPermissible("abc"));
+    }
+
+    private boolean deleteFile(final File file) {
+        if (file.isDirectory()) {
+            FileUtils.deleteFilesInDir(file, null, null, true, true);
+        }
+        return FileUtils.deleteFile(file, null, 10);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/flow/impl/DataFlowManagementServiceImplTest.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/flow/impl/DataFlowManagementServiceImplTest.java b/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/flow/impl/DataFlowManagementServiceImplTest.java
new file mode 100644
index 0000000..6294dfc
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/flow/impl/DataFlowManagementServiceImplTest.java
@@ -0,0 +1,341 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.flow.impl;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
+import java.io.ByteArrayInputStream;
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
+
+import javax.xml.parsers.DocumentBuilder;
+import javax.xml.parsers.DocumentBuilderFactory;
+import javax.xml.parsers.ParserConfigurationException;
+
+import org.apache.nifi.cluster.flow.DataFlowDao;
+import org.apache.nifi.cluster.flow.PersistedFlowState;
+import org.apache.nifi.cluster.protocol.ClusterManagerProtocolSender;
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
+import org.apache.nifi.cluster.protocol.ProtocolContext;
+import org.apache.nifi.cluster.protocol.ProtocolException;
+import org.apache.nifi.cluster.protocol.ProtocolHandler;
+import org.apache.nifi.cluster.protocol.StandardDataFlow;
+import org.apache.nifi.cluster.protocol.impl.ClusterManagerProtocolSenderImpl;
+import org.apache.nifi.cluster.protocol.impl.SocketProtocolListener;
+import org.apache.nifi.cluster.protocol.jaxb.JaxbProtocolContext;
+import org.apache.nifi.cluster.protocol.jaxb.message.JaxbProtocolUtils;
+import org.apache.nifi.cluster.protocol.message.FlowResponseMessage;
+import org.apache.nifi.cluster.protocol.message.ProtocolMessage;
+import org.apache.nifi.io.socket.ServerSocketConfiguration;
+import org.apache.nifi.io.socket.SocketConfiguration;
+
+import org.apache.commons.io.FileUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.w3c.dom.Document;
+import org.w3c.dom.Element;
+import org.xml.sax.SAXException;
+
+/**
+ * @author unattributed
+ */
+public class DataFlowManagementServiceImplTest {
+
+    private DataFlowManagementServiceImpl service;
+    private File restoreLocation;
+    private File primaryLocation;
+    private DataFlowDao dao;
+    private int apiDummyPort;
+    private int socketPort;
+    private SocketConfiguration socketConfig;
+    private ClusterManagerProtocolSender sender;
+    private ServerSocketConfiguration serverSocketConfig;
+    private SocketProtocolListener listener;
+
+    @Before
+    public void setup() throws IOException {
+
+        primaryLocation = new File(System.getProperty("java.io.tmpdir") + "/primary");
+        restoreLocation = new File(System.getProperty("java.io.tmpdir") + "/restore");
+
+        FileUtils.deleteDirectory(primaryLocation);
+        FileUtils.deleteDirectory(restoreLocation);
+
+        ProtocolContext protocolContext = new JaxbProtocolContext(JaxbProtocolUtils.JAXB_CONTEXT);
+
+        socketConfig = new SocketConfiguration();
+        socketConfig.setSocketTimeout(1000);
+        serverSocketConfig = new ServerSocketConfiguration();
+
+        dao = new DataFlowDaoImpl(primaryLocation, restoreLocation, false);
+
+        sender = new ClusterManagerProtocolSenderImpl(socketConfig, protocolContext);
+
+        service = new DataFlowManagementServiceImpl(dao, sender);
+        service.start();
+
+        listener = new SocketProtocolListener(1, 0, serverSocketConfig, protocolContext);
+        listener.start();
+
+        apiDummyPort = 7777;
+        socketPort = listener.getPort();
+    }
+
+    @After
+    public void teardown() throws IOException {
+
+        if (service != null && service.isRunning()) {
+            service.stop();
+        }
+
+        if (listener != null && listener.isRunning()) {
+            try {
+                listener.stop();
+            } catch (final Exception ex) {
+                ex.printStackTrace(System.out);
+            }
+        }
+
+    }
+
+    @Test
+    public void testLoadFlowWithNonExistentFlow() throws ParserConfigurationException, SAXException, IOException {
+        verifyFlow();
+    }
+
+    @Test
+    public void testLoadFlowWithNonExistentFlowWhenServiceStopped() throws IOException, SAXException, ParserConfigurationException {
+        service.stop();
+        verifyFlow();
+    }
+
+    private void verifyFlow() throws ParserConfigurationException, SAXException, IOException {
+        final byte[] flowBytes = service.loadDataFlow().getDataFlow().getFlow();
+        final DocumentBuilder docBuilder = DocumentBuilderFactory.newInstance().newDocumentBuilder();
+        final Document doc = docBuilder.parse(new ByteArrayInputStream(flowBytes));
+        final Element controller = (Element) doc.getElementsByTagName("flowController").item(0);
+        final Element rootGroup = (Element) controller.getElementsByTagName("rootGroup").item(0);
+        final String rootGroupName = rootGroup.getElementsByTagName("name").item(0).getTextContent();
+        assertEquals("NiFi Flow", rootGroupName);
+    }
+
+    @Test
+    public void testLoadFlowSingleNode() throws Exception {
+        String flowStr = "<rootGroup />";
+        byte[] flowBytes = flowStr.getBytes();
+        listener.addHandler(new FlowRequestProtocolHandler(new StandardDataFlow(flowStr.getBytes("UTF-8"), new byte[0], new byte[0])));
+
+        NodeIdentifier nodeId = new NodeIdentifier("1", "localhost", apiDummyPort, "localhost", socketPort);
+        service.setNodeIds(new HashSet<>(Arrays.asList(nodeId)));
+        service.setPersistedFlowState(PersistedFlowState.STALE);
+
+        assertEquals(PersistedFlowState.STALE, service.getPersistedFlowState());
+
+        // sleep long enough for the flow retriever to run
+        waitForState(PersistedFlowState.CURRENT);
+
+        assertEquals(PersistedFlowState.CURRENT, service.getPersistedFlowState());
+        assertArrayEquals(flowBytes, service.loadDataFlow().getDataFlow().getFlow());
+
+    }
+
+    @Test
+    public void testLoadFlowWithSameNodeIds() throws Exception {
+
+        String flowStr = "<rootGroup />";
+        listener.addHandler(new FlowRequestProtocolHandler(new StandardDataFlow(flowStr.getBytes("UTF-8"), new byte[0], new byte[0])));
+
+        NodeIdentifier nodeId1 = new NodeIdentifier("1", "localhost", apiDummyPort, "localhost", socketPort);
+        NodeIdentifier nodeId2 = new NodeIdentifier("2", "localhost", apiDummyPort, "localhost", socketPort);
+        service.setNodeIds(new HashSet<>(Arrays.asList(nodeId1, nodeId2)));
+        service.setPersistedFlowState(PersistedFlowState.STALE);
+
+        assertEquals(PersistedFlowState.STALE, service.getPersistedFlowState());
+
+        // sleep long enough for the flow retriever to run
+        waitForState(PersistedFlowState.CURRENT);
+
+        // verify that flow is current
+        assertEquals(PersistedFlowState.CURRENT, service.getPersistedFlowState());
+
+        // add same ids in different order
+        service.setNodeIds(new HashSet<>(Arrays.asList(nodeId2, nodeId1)));
+
+        // verify flow is still current
+        assertEquals(PersistedFlowState.CURRENT, service.getPersistedFlowState());
+
+    }
+
+    @Test
+    public void testLoadFlowWithABadNode() throws Exception {
+
+        String flowStr = "<rootGroup />";
+        byte[] flowBytes = flowStr.getBytes();
+        listener.addHandler(new FlowRequestProtocolHandler(new StandardDataFlow(flowStr.getBytes("UTF-8"), new byte[0], new byte[0])));
+
+        NodeIdentifier nodeId1 = new NodeIdentifier("1", "localhost", apiDummyPort, "localhost", socketPort + 1);
+        NodeIdentifier nodeId2 = new NodeIdentifier("2", "localhost", apiDummyPort, "localhost", socketPort);
+        service.setNodeIds(new HashSet<>(Arrays.asList(nodeId1, nodeId2)));
+        service.setPersistedFlowState(PersistedFlowState.STALE);
+
+        assertEquals(PersistedFlowState.STALE, service.getPersistedFlowState());
+
+        // sleep long enough for the flow retriever to run
+        waitForState(PersistedFlowState.CURRENT);
+
+        assertEquals(PersistedFlowState.CURRENT, service.getPersistedFlowState());
+        assertArrayEquals(flowBytes, service.loadDataFlow().getDataFlow().getFlow());
+
+    }
+
+    @Test
+    public void testLoadFlowWithConstantNodeIdChanging() throws Exception {
+        String flowStr = "<rootGroup />";
+        byte[] flowBytes = flowStr.getBytes();
+        listener.addHandler(new FlowRequestProtocolHandler(new StandardDataFlow(flowStr.getBytes("UTF-8"), new byte[0], new byte[0])));
+
+        NodeIdentifier nodeId1 = new NodeIdentifier("1", "localhost", apiDummyPort, "localhost", socketPort + 1);
+        NodeIdentifier nodeId2 = new NodeIdentifier("2", "localhost", apiDummyPort, "localhost", socketPort);
+
+        for (int i = 0; i < 1000; i++) {
+            service.setNodeIds(new HashSet<>(Arrays.asList(nodeId1, nodeId2)));
+            service.setPersistedFlowState(PersistedFlowState.STALE);
+            assertEquals(PersistedFlowState.STALE, service.getPersistedFlowState());
+        }
+
+        // sleep long enough for the flow retriever to run
+        waitForState(PersistedFlowState.CURRENT);
+
+        assertEquals(PersistedFlowState.CURRENT, service.getPersistedFlowState());
+        assertArrayEquals(flowBytes, service.loadDataFlow().getDataFlow().getFlow());
+    }
+
+    @Test
+    public void testLoadFlowWithConstantNodeIdChangingWithRetrievalDelay() throws Exception {
+
+        String flowStr = "<rootGroup />";
+        listener.addHandler(new FlowRequestProtocolHandler(new StandardDataFlow(flowStr.getBytes("UTF-8"), new byte[0], new byte[0])));
+
+        NodeIdentifier nodeId1 = new NodeIdentifier("1", "localhost", apiDummyPort, "localhost", socketPort + 1);
+        NodeIdentifier nodeId2 = new NodeIdentifier("2", "localhost", apiDummyPort, "localhost", socketPort);
+
+        service.setRetrievalDelay("5 sec");
+        for (int i = 0; i < 1000; i++) {
+            service.setNodeIds(new HashSet<>(Arrays.asList(nodeId1, nodeId2)));
+            service.setPersistedFlowState(PersistedFlowState.STALE);
+            assertEquals(PersistedFlowState.STALE, service.getPersistedFlowState());
+        }
+
+        // sleep long enough for the flow retriever to run
+        waitForState(PersistedFlowState.STALE);
+
+        assertEquals(PersistedFlowState.STALE, service.getPersistedFlowState());
+
+    }
+
+    @Test
+    public void testStopRequestedWhileRetrieving() throws Exception {
+
+        String flowStr = "<rootGroup />";
+        listener.addHandler(new FlowRequestProtocolHandler(new StandardDataFlow(flowStr.getBytes("UTF-8"), new byte[0], new byte[0])));
+        Set<NodeIdentifier> nodeIds = new HashSet<>();
+        for (int i = 0; i < 1000; i++) {
+            nodeIds.add(new NodeIdentifier("1", "localhost", apiDummyPort, "localhost", socketPort + 1));
+        }
+        nodeIds.add(new NodeIdentifier("1", "localhost", apiDummyPort, "localhost", socketPort));
+
+        long lastRetrievalTime = service.getLastRetrievalTime();
+
+        service.setNodeIds(nodeIds);
+        service.setPersistedFlowState(PersistedFlowState.STALE);
+        assertEquals(PersistedFlowState.STALE, service.getPersistedFlowState());
+
+        // sleep long enough for the flow retriever to run
+        waitForState(PersistedFlowState.STALE);
+
+        service.stop();
+
+        service.setPersistedFlowState(PersistedFlowState.STALE);
+        assertEquals(PersistedFlowState.STALE, service.getPersistedFlowState());
+
+        assertEquals(lastRetrievalTime, service.getLastRetrievalTime());
+
+    }
+
+    @Test
+    public void testLoadFlowUnknownState() throws Exception {
+
+        String flowStr = "<rootGroup />";
+        byte[] flowBytes = flowStr.getBytes();
+        listener.addHandler(new FlowRequestProtocolHandler(new StandardDataFlow(flowStr.getBytes("UTF-8"), new byte[0], new byte[0])));
+        NodeIdentifier nodeId = new NodeIdentifier("1", "localhost", apiDummyPort, "localhost", socketPort);
+
+        service.setNodeIds(new HashSet<>(Arrays.asList(nodeId)));
+        service.setPersistedFlowState(PersistedFlowState.STALE);
+        assertEquals(PersistedFlowState.STALE, service.getPersistedFlowState());
+
+        service.setPersistedFlowState(PersistedFlowState.UNKNOWN);
+
+        assertEquals(PersistedFlowState.UNKNOWN, service.getPersistedFlowState());
+
+        service.setPersistedFlowState(PersistedFlowState.STALE);
+        assertEquals(PersistedFlowState.STALE, service.getPersistedFlowState());
+
+        // sleep long enough for the flow retriever to run
+        waitForState(PersistedFlowState.CURRENT);
+
+        assertArrayEquals(flowBytes, service.loadDataFlow().getDataFlow().getFlow());
+
+    }
+
+    private class FlowRequestProtocolHandler implements ProtocolHandler {
+
+        private StandardDataFlow dataFlow;
+
+        public FlowRequestProtocolHandler(final StandardDataFlow dataFlow) {
+            this.dataFlow = dataFlow;
+        }
+
+        @Override
+        public boolean canHandle(ProtocolMessage msg) {
+            return true;
+        }
+
+        @Override
+        public ProtocolMessage handle(ProtocolMessage msg) throws ProtocolException {
+            FlowResponseMessage response = new FlowResponseMessage();
+            response.setDataFlow(dataFlow);
+            return response;
+        }
+
+    }
+
+    private void waitForState(PersistedFlowState state) throws InterruptedException {
+        for (int i = 0; i < 30; i++) {
+            if (service.getPersistedFlowState() == state) {
+                break;
+            } else {
+                Thread.sleep(1000);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/manager/impl/HttpRequestReplicatorImplTest.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/manager/impl/HttpRequestReplicatorImplTest.java b/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/manager/impl/HttpRequestReplicatorImplTest.java
new file mode 100644
index 0000000..0c65aba
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/manager/impl/HttpRequestReplicatorImplTest.java
@@ -0,0 +1,368 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.manager.impl;
+
+import org.apache.nifi.cluster.manager.impl.HttpRequestReplicatorImpl;
+import javax.ws.rs.core.Response;
+import javax.xml.bind.annotation.XmlRootElement;
+import javax.ws.rs.HttpMethod;
+import javax.ws.rs.core.MultivaluedMap;
+import java.net.URISyntaxException;
+import java.util.Arrays;
+import java.util.Iterator;
+import javax.ws.rs.core.StreamingOutput;
+import org.apache.nifi.cluster.manager.testutils.HttpResponse;
+import org.apache.nifi.cluster.manager.testutils.HttpServer;
+import com.sun.jersey.api.client.Client;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.net.URI;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import javax.ws.rs.core.Response.Status;
+import org.apache.nifi.cluster.manager.NodeResponse;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.apache.nifi.cluster.manager.testutils.HttpResponseAction;
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
+import static org.junit.Assert.*;
+
+/**
+ * @author unattributed
+ */
+public class HttpRequestReplicatorImplTest {
+
+    private Client client;
+    private HttpRequestReplicatorImpl replicator;
+    private int executorThreadCount;
+    private int serverThreadCount;
+    private int serverPort;
+    private HttpServer server;
+    private Map<String, List<String>> expectedRequestParameters;
+    private Map<String, String> expectedRequestHeaders;
+    private Map<String, String> expectedResponseHeaders;
+    private Object expectedEntity;
+    private String expectedBody;
+    private URI prototypeUri;
+
+    @Before
+    public void setUp() throws IOException, URISyntaxException {
+
+        executorThreadCount = 5;
+        serverThreadCount = 3;
+
+        client = Client.create();
+
+        replicator = new HttpRequestReplicatorImpl(executorThreadCount, client, "1 sec", "1 sec");
+        replicator.start();
+
+        expectedRequestHeaders = new HashMap<>();
+        expectedRequestHeaders.put("header1", "header value1");
+        expectedRequestHeaders.put("header2", "header value2");
+
+        expectedRequestParameters = new HashMap<>();
+        expectedRequestParameters.put("param1", Arrays.asList("p value1"));
+        expectedRequestParameters.put("param2", Arrays.asList("p value2"));
+
+        expectedResponseHeaders = new HashMap<>();
+        expectedResponseHeaders.put("header1", "header value1");
+        expectedResponseHeaders.put("header2", "header value2");
+
+        expectedEntity = new Entity();
+
+        expectedBody = "some text";
+
+        prototypeUri = new URI("http://prototype.host/path/to/resource");
+
+        server = new HttpServer(serverThreadCount, 0);
+        server.start();
+        serverPort = server.getPort();
+    }
+
+    @After
+    public void teardown() {
+        if (server.isRunning()) {
+            server.stop();
+        }
+        if (replicator.isRunning()) {
+            replicator.stop();
+        }
+    }
+
+    @Test
+    public void testReplicateGetLessNodesThanReplicatorThreads() throws Throwable {
+        testReplicateXXX(executorThreadCount - 1, HttpMethod.GET);
+    }
+
+    @Test
+    public void testReplicateGetMoreNodesThanReplicatorThreads() throws Throwable {
+        testReplicateXXX(executorThreadCount + 1, HttpMethod.GET);
+    }
+
+    @Test
+    public void testReplicateGetWithUnresponsiveNode() throws Throwable {
+
+        // nodes
+        Set<NodeIdentifier> nodeIds = createNodes(2, "localhost", serverPort);
+
+        // response
+        HttpResponse expectedResponse = new HttpResponse(Status.OK, expectedBody);
+
+        // first response normal, second response slow
+        server.addResponseAction(new HttpResponseAction(expectedResponse));
+        server.addResponseAction(new HttpResponseAction(expectedResponse, 3500));
+
+        Set<NodeResponse> responses = replicator.replicate(
+                nodeIds,
+                HttpMethod.GET,
+                prototypeUri,
+                expectedRequestParameters,
+                expectedRequestHeaders);
+
+        assertEquals(nodeIds.size(), responses.size());
+
+        Iterator<NodeResponse> nodeResponseItr = responses.iterator();
+
+        NodeResponse firstResponse = nodeResponseItr.next();
+        NodeResponse secondResponse = nodeResponseItr.next();
+        NodeResponse goodResponse;
+        NodeResponse badResponse;
+        if (firstResponse.hasThrowable()) {
+            goodResponse = secondResponse;
+            badResponse = firstResponse;
+        } else {
+            goodResponse = firstResponse;
+            badResponse = secondResponse;
+        }
+
+        // good response
+        // check status
+        assertEquals(Status.OK.getStatusCode(), goodResponse.getStatus());
+
+        // check entity stream
+        final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        ((StreamingOutput) goodResponse.getResponse().getEntity()).write(baos);
+        assertEquals("some text", new String(baos.toByteArray()));
+
+        // bad response
+        assertTrue(badResponse.hasThrowable());
+        assertEquals(Status.INTERNAL_SERVER_ERROR.getStatusCode(), badResponse.getStatus());
+
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testReplicateGetWithEntity() throws Throwable {
+        testReplicateXXXEntity(HttpMethod.GET);
+    }
+
+    @Test
+    public void testReplicatePost() throws Throwable {
+        testReplicateXXX(HttpMethod.POST);
+    }
+
+    @Test
+    public void testReplicatePostWithEntity() throws Throwable {
+        testReplicateXXXEntity(HttpMethod.POST);
+    }
+
+    @Test
+    public void testReplicatePut() throws Throwable {
+        testReplicateXXX(HttpMethod.PUT);
+    }
+
+    @Test
+    public void testReplicatePutWithEntity() throws Throwable {
+        testReplicateXXXEntity(HttpMethod.PUT);
+    }
+
+    @Test
+    public void testReplicateDelete() throws Throwable {
+        testReplicateXXX(HttpMethod.DELETE);
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testReplicateDeleteWithEntity() throws Throwable {
+        testReplicateXXXEntity(HttpMethod.DELETE);
+    }
+
+    @Test
+    public void testReplicateHead() throws Throwable {
+        testReplicateXXX(HttpMethod.HEAD);
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testReplicateHeadWithEntity() throws Throwable {
+        testReplicateXXXEntity(HttpMethod.HEAD);
+    }
+
+    @Test
+    public void testReplicateOptions() throws Throwable {
+        testReplicateXXX(HttpMethod.OPTIONS);
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testReplicateOptionsWithEntity() throws Throwable {
+        testReplicateXXXEntity(HttpMethod.OPTIONS);
+    }
+
+    private void testReplicateXXX(final String method) throws Throwable {
+        testReplicateXXX(executorThreadCount, method);
+    }
+
+    private void testReplicateXXX(final int numNodes, final String method) throws Throwable {
+
+        // nodes
+        Set<NodeIdentifier> nodeIds = createNodes(numNodes, "localhost", serverPort);
+
+        // set up responses
+        for (int i = 0; i < nodeIds.size(); i++) {
+            HttpResponse response = new HttpResponse(Status.OK, expectedBody);
+            response.addHeaders(expectedResponseHeaders);
+            server.addResponseAction(new HttpResponseAction(response));
+        }
+
+        // setup request parameters
+        server.addCheckedParameters(expectedRequestParameters);
+
+        // request headers
+        server.addCheckedHeaders(expectedRequestHeaders);
+
+        Set<NodeResponse> responses = replicator.replicate(
+                nodeIds,
+                method,
+                prototypeUri,
+                expectedRequestParameters,
+                expectedRequestHeaders);
+
+        Set<NodeIdentifier> returnedNodeIds = new HashSet<>();
+        for (NodeResponse response : responses) {
+
+            // check if we received an exception
+            if (response.hasThrowable()) {
+                throw response.getThrowable();
+            }
+
+            // gather ids to verify later
+            returnedNodeIds.add(response.getNodeId());
+
+            // check status
+            assertEquals(Status.OK.getStatusCode(), response.getStatus());
+
+            Response serverResponse = response.getResponse();
+
+            // check response headers are copied
+            assertTrue(containsHeaders(expectedResponseHeaders, serverResponse.getMetadata()));
+
+            // check entity stream
+            if (HttpMethod.HEAD.equalsIgnoreCase(method)) {
+                assertNull(serverResponse.getEntity());
+            } else {
+                assertTrue(isEquals((StreamingOutput) serverResponse.getEntity(), expectedBody));
+            }
+
+        }
+
+        // check node Ids
+        assertEquals(nodeIds, returnedNodeIds);
+    }
+
+    private void testReplicateXXXEntity(final String method) throws Throwable {
+        testReplicateXXXEntity(executorThreadCount, method);
+    }
+
+    private void testReplicateXXXEntity(final int numNodes, final String method) throws Throwable {
+
+        // nodes
+        Set<NodeIdentifier> nodeIds = createNodes(numNodes, "localhost", serverPort);
+
+        // set up responses
+        for (int i = 0; i < nodeIds.size(); i++) {
+            HttpResponse response = new HttpResponse(Status.OK, expectedBody);
+            response.addHeaders(expectedResponseHeaders);
+            server.addResponseAction(new HttpResponseAction(response));
+        }
+
+        // headers
+        expectedRequestHeaders.put("Content-Type", "application/xml");
+
+        Set<NodeResponse> responses = replicator.replicate(
+                nodeIds,
+                method,
+                prototypeUri,
+                expectedEntity,
+                expectedRequestHeaders);
+
+        Set<NodeIdentifier> returnedNodeIds = new HashSet<>();
+        for (NodeResponse response : responses) {
+
+            // check if we received an exception
+            if (response.hasThrowable()) {
+                throw response.getThrowable();
+            }
+
+            // gather ids to verify later
+            returnedNodeIds.add(response.getNodeId());
+
+            // check status
+            assertEquals(Status.OK.getStatusCode(), response.getStatus());
+
+            Response serverResponse = response.getResponse();
+
+            // check response headers are copied
+            assertTrue(containsHeaders(expectedResponseHeaders, serverResponse.getMetadata()));
+
+            // check entity stream
+            assertTrue(isEquals((StreamingOutput) serverResponse.getEntity(), expectedBody));
+
+        }
+
+        // check node Ids
+        assertEquals(nodeIds, returnedNodeIds);
+    }
+
+    private Set<NodeIdentifier> createNodes(int num, String host, int apiPort) {
+        Set<NodeIdentifier> result = new HashSet<>();
+        for (int i = 0; i < num; i++) {
+            result.add(new NodeIdentifier(String.valueOf(i), host, apiPort, host, 1));
+        }
+        return result;
+    }
+
+    private boolean isEquals(StreamingOutput so, String expectedText) throws IOException {
+        final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        so.write(baos);
+        return expectedText.equals(new String(baos.toByteArray()));
+    }
+
+    private boolean containsHeaders(Map<String, String> expectedHeaders, MultivaluedMap<String, Object> metadata) {
+        for (Map.Entry<String, String> expectedEntry : expectedHeaders.entrySet()) {
+            if (expectedEntry.getValue().equals(metadata.getFirst(expectedEntry.getKey())) == false) {
+                return false;
+            }
+        }
+        return true;
+    }
+
+}
+
+@XmlRootElement
+class Entity {
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/manager/impl/HttpResponseMapperImplTest.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/manager/impl/HttpResponseMapperImplTest.java b/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/manager/impl/HttpResponseMapperImplTest.java
new file mode 100644
index 0000000..d45a4d1
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/manager/impl/HttpResponseMapperImplTest.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.manager.impl;
+
+import org.apache.nifi.cluster.manager.impl.HttpResponseMapperImpl;
+import com.sun.jersey.api.client.ClientResponse;
+import com.sun.jersey.core.util.MultivaluedMapImpl;
+import java.io.ByteArrayInputStream;
+import java.util.Map;
+import java.util.HashSet;
+import java.util.Set;
+import java.net.URI;
+import java.net.URISyntaxException;
+import org.apache.nifi.cluster.manager.NodeResponse;
+import org.apache.nifi.cluster.node.Node;
+import org.apache.nifi.cluster.node.Node.Status;
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
+import org.junit.Before;
+import org.junit.Test;
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
+
+/**
+ * @author unattributed
+ */
+public class HttpResponseMapperImplTest {
+
+    private HttpResponseMapperImpl mapper;
+
+    private URI dummyUri;
+
+    @Before
+    public void setup() throws URISyntaxException {
+        mapper = new HttpResponseMapperImpl();
+        dummyUri = new URI("http://dummy.com");
+    }
+
+    @Test
+    public void testToNodeStatusWithNo2xxResponses() {
+
+        Set<NodeResponse> nodeResponses = new HashSet<>();
+        nodeResponses.add(createNodeResourceResponse("1", 400));
+        nodeResponses.add(createNodeResourceResponse("2", 100));
+        nodeResponses.add(createNodeResourceResponse("3", 300));
+        nodeResponses.add(createNodeResourceResponse("4", 500));
+
+        Map<NodeResponse, Status> map = mapper.map(dummyUri, nodeResponses);
+
+        // since no 2xx responses, any 5xx is disconnected
+        for (Map.Entry<NodeResponse, Status> entry : map.entrySet()) {
+            NodeResponse response = entry.getKey();
+            Status status = entry.getValue();
+            switch (response.getNodeId().getId()) {
+                case "1":
+                    assertTrue(status == Node.Status.CONNECTED);
+                    break;
+                case "2":
+                    assertTrue(status == Node.Status.CONNECTED);
+                    break;
+                case "3":
+                    assertTrue(status == Node.Status.CONNECTED);
+                    break;
+                case "4":
+                    assertTrue(status == Node.Status.DISCONNECTED);
+                    break;
+            }
+        }
+    }
+
+    @Test
+    public void testToNodeStatusWith2xxResponses() {
+
+        Set<NodeResponse> nodeResponses = new HashSet<>();
+        nodeResponses.add(createNodeResourceResponse("1", 200));
+        nodeResponses.add(createNodeResourceResponse("2", 100));
+        nodeResponses.add(createNodeResourceResponse("3", 300));
+        nodeResponses.add(createNodeResourceResponse("4", 500));
+
+        Map<NodeResponse, Status> map = mapper.map(dummyUri, nodeResponses);
+
+        // since there were 2xx responses, any non-2xx is disconnected
+        for (Map.Entry<NodeResponse, Status> entry : map.entrySet()) {
+            NodeResponse response = entry.getKey();
+            Status status = entry.getValue();
+            switch (response.getNodeId().getId()) {
+                case "1":
+                    assertTrue(status == Node.Status.CONNECTED);
+                    break;
+                case "2":
+                    assertTrue(status == Node.Status.DISCONNECTED);
+                    break;
+                case "3":
+                    assertTrue(status == Node.Status.DISCONNECTED);
+                    break;
+                case "4":
+                    assertTrue(status == Node.Status.DISCONNECTED);
+                    break;
+            }
+        }
+    }
+
+    private NodeResponse createNodeResourceResponse(String nodeId, int statusCode) {
+
+        ClientResponse clientResponse = mock(ClientResponse.class);
+        when(clientResponse.getStatus()).thenReturn(statusCode);
+        when(clientResponse.getHeaders()).thenReturn(new MultivaluedMapImpl());
+        when(clientResponse.getEntityInputStream()).thenReturn(new ByteArrayInputStream(new byte[0]));
+
+        NodeIdentifier nodeIdentifier = new NodeIdentifier(nodeId, "localhost", 1, "localhost", 1);
+        return new NodeResponse(nodeIdentifier, "GET", dummyUri, clientResponse, 1L, "111");
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/manager/impl/TestWebClusterManager.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/manager/impl/TestWebClusterManager.java b/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/manager/impl/TestWebClusterManager.java
new file mode 100644
index 0000000..7347a94
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/manager/impl/TestWebClusterManager.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.manager.impl;
+
+import org.apache.nifi.cluster.manager.impl.WebClusterManager;
+import static org.junit.Assert.assertEquals;
+
+import java.text.DateFormat;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+
+import org.junit.Test;
+
+public class TestWebClusterManager {
+
+    @Test
+    public void testNormalizedStatusSnapshotDate() throws ParseException {
+        final DateFormat df = new SimpleDateFormat("yyyy/MM/dd HH:mm:SS.SSS");
+        final Date date1 = df.parse("2014/01/01 00:00:00.000");
+        final Date date2 = df.parse("2014/01/01 00:04:59.999");
+        final Date date3 = df.parse("2014/01/01 00:05:00.000");
+        final Date date4 = df.parse("2014/01/01 00:05:00.001");
+
+        final Date normalized1 = WebClusterManager.normalizeStatusSnapshotDate(date1, 300000);
+        assertEquals(date1, normalized1);
+
+        final Date normalized2 = WebClusterManager.normalizeStatusSnapshotDate(date2, 300000);
+        assertEquals(date1, normalized2);
+
+        final Date normalized3 = WebClusterManager.normalizeStatusSnapshotDate(date3, 300000);
+        assertEquals(date3, normalized3);
+
+        final Date normalized4 = WebClusterManager.normalizeStatusSnapshotDate(date4, 300000);
+        assertEquals(date3, normalized4);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/manager/testutils/HttpRequest.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/manager/testutils/HttpRequest.java b/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/manager/testutils/HttpRequest.java
new file mode 100644
index 0000000..35380dd
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/manager/testutils/HttpRequest.java
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.manager.testutils;
+
+import java.io.IOException;
+import java.io.Reader;
+import java.io.UnsupportedEncodingException;
+import java.net.URLDecoder;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import javax.ws.rs.HttpMethod;
+import javax.ws.rs.core.MediaType;
+import org.apache.commons.lang3.StringUtils;
+
+/**
+ * Encapsulates an HTTP request. The toString method returns the
+ * specification-compliant request.
+ *
+ * @author unattributed
+ */
+public class HttpRequest {
+
+    private String method;
+    private String uri;
+    private String rawUri;
+    private String version;
+    private String body;
+    private String rawRequest;
+    private Map<String, String> headers = new HashMap<>();
+    private Map<String, List<String>> parameters = new HashMap<>();
+
+    public static HttpRequestBuilder createFromRequestLine(final String requestLine) {
+        return new HttpRequestBuilder(requestLine);
+    }
+
+    public String getBody() {
+        return body;
+    }
+
+    public Map<String, String> getHeaders() {
+        return Collections.unmodifiableMap(headers);
+    }
+
+    public String getHeaderValue(final String header) {
+        for (final Map.Entry<String, String> entry : getHeaders().entrySet()) {
+            if (entry.getKey().equalsIgnoreCase(header)) {
+                return entry.getValue();
+            }
+        }
+        return null;
+    }
+
+    public String getMethod() {
+        return method;
+    }
+
+    public Map<String, List<String>> getParameters() {
+        final Map<String, List<String>> result = new HashMap<>();
+        for (final Map.Entry<String, List<String>> entry : parameters.entrySet()) {
+            result.put(entry.getKey(), Collections.unmodifiableList(entry.getValue()));
+        }
+        return Collections.unmodifiableMap(result);
+    }
+
+    public String getUri() {
+        return uri;
+    }
+
+    public String getRawUri() {
+        return rawUri;
+    }
+
+    public String getVersion() {
+        return version;
+    }
+
+    @Override
+    public String toString() {
+        return rawRequest;
+    }
+
+    /**
+     * A builder for constructing basic HTTP requests. It handles only enough of
+     * the HTTP specification to support basic unit testing, and it should not
+     * be used otherwise.
+     */
+    public static class HttpRequestBuilder {
+
+        private String method;
+        private String uri;
+        private String rawUri;
+        private String version;
+        private Map<String, String> headers = new HashMap<>();
+        private Map<String, List<String>> parameters = new HashMap<>();
+        private int contentLength = 0;
+        private String contentType;
+        private String body = "";
+        private StringBuilder rawRequest = new StringBuilder();
+
+        private HttpRequestBuilder(final String requestLine) {
+
+            final String[] tokens = requestLine.split(" ");
+            if (tokens.length != 3) {
+                throw new IllegalArgumentException("Invalid HTTP Request Line: " + requestLine);
+            }
+
+            method = tokens[0];
+            if (HttpMethod.GET.equalsIgnoreCase(method) || HttpMethod.HEAD.equalsIgnoreCase(method) || HttpMethod.DELETE.equalsIgnoreCase(method) || HttpMethod.OPTIONS.equalsIgnoreCase(method)) {
+                final int queryIndex = tokens[1].indexOf("?");
+                if (queryIndex > -1) {
+                    uri = tokens[1].substring(0, queryIndex);
+                    addParameters(tokens[1].substring(queryIndex + 1));
+                } else {
+                    uri = tokens[1];
+                }
+            }
+            rawUri = tokens[1];
+            version = tokens[2];
+            rawRequest.append(requestLine).append("\n");
+        }
+
+        private void addHeader(final String key, final String value) {
+            if (key.contains(" ")) {
+                throw new IllegalArgumentException("Header key may not contain spaces.");
+            } else if ("content-length".equalsIgnoreCase(key)) {
+                contentLength = (StringUtils.isBlank(value.trim())) ? 0 : Integer.parseInt(value.trim());
+            } else if ("content-type".equalsIgnoreCase(key)) {
+                contentType = value.trim();
+            }
+            headers.put(key, value);
+        }
+
+        public void addHeader(final String header) {
+            final int firstColonIndex = header.indexOf(":");
+            if (firstColonIndex < 0) {
+                throw new IllegalArgumentException("Invalid HTTP Header line: " + header);
+            }
+            addHeader(header.substring(0, firstColonIndex), header.substring(firstColonIndex + 1));
+            rawRequest.append(header).append("\n");
+        }
+
+        // final because constructor calls it
+        public final void addParameters(final String queryString) {
+
+            if (StringUtils.isBlank(queryString)) {
+                return;
+            }
+
+            final String normQueryString;
+            if (queryString.startsWith("?")) {
+                normQueryString = queryString.substring(1);
+            } else {
+                normQueryString = queryString;
+            }
+            final String[] keyValuePairs = normQueryString.split("&");
+            for (final String keyValuePair : keyValuePairs) {
+                final String[] keyValueTokens = keyValuePair.split("=");
+                try {
+                    addParameter(
+                            URLDecoder.decode(keyValueTokens[0], "utf-8"),
+                            URLDecoder.decode(keyValueTokens[1], "utf-8")
+                    );
+                } catch (UnsupportedEncodingException use) {
+                    throw new RuntimeException(use);
+                }
+            }
+        }
+
+        public void addParameter(final String key, final String value) {
+
+            if (key.contains(" ")) {
+                throw new IllegalArgumentException("Parameter key may not contain spaces: " + key);
+            }
+
+            final List<String> values;
+            if (parameters.containsKey(key)) {
+                values = parameters.get(key);
+            } else {
+                values = new ArrayList<>();
+                parameters.put(key, values);
+            }
+            values.add(value);
+        }
+
+        public void addBody(final Reader reader) throws IOException {
+
+            if (contentLength <= 0) {
+                return;
+            }
+
+            final char[] buf = new char[contentLength];
+            int offset = 0;
+            int numRead = 0;
+            while (offset < buf.length && (numRead = reader.read(buf, offset, buf.length - offset)) >= 0) {
+                offset += numRead;
+            }
+            body = new String(buf);
+            rawRequest.append("\n");
+            rawRequest.append(body);
+        }
+
+        public HttpRequest build() throws UnsupportedEncodingException {
+
+            if (HttpMethod.GET.equalsIgnoreCase(method) == false && HttpMethod.HEAD.equalsIgnoreCase(method) == false && contentType.equalsIgnoreCase(MediaType.APPLICATION_FORM_URLENCODED)) {
+                addParameters(body);
+            }
+
+            final HttpRequest request = new HttpRequest();
+            request.method = this.method;
+            request.uri = this.uri;
+            request.rawUri = this.rawUri;
+            request.version = this.version;
+            request.headers.putAll(this.headers);
+            request.parameters.putAll(this.parameters);
+            request.body = this.body;
+            request.rawRequest = this.rawRequest.toString();
+
+            return request;
+        }
+
+    }
+}