You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by mc...@apache.org on 2015/01/19 19:15:46 UTC

[12/59] [abbrv] [partial] incubator-nifi git commit: Reworked overall directory structure to make releasing nifi vs maven plugis easier

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/node/Node.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/node/Node.java b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/node/Node.java
deleted file mode 100644
index 84565da..0000000
--- a/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/node/Node.java
+++ /dev/null
@@ -1,252 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.cluster.node;
-
-import java.util.concurrent.atomic.AtomicLong;
-
-import org.apache.nifi.cluster.HeartbeatPayload;
-import org.apache.nifi.cluster.protocol.Heartbeat;
-import org.apache.nifi.cluster.protocol.NodeIdentifier;
-import org.apache.nifi.cluster.protocol.ProtocolException;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Represents a connected flow controller. Nodes always have an immutable
- * identifier and a status. The status may be changed, but never null.
- *
- * A Node may be cloned, but the cloning is a shallow copy of the instance.
- *
- * This class overrides hashCode and equals and considers two instances to be
- * equal if they have the equal NodeIdentifiers.
- *
- * @author unattributed
- */
-public class Node implements Cloneable, Comparable<Node> {
-
-    private static final Logger lockLogger = LoggerFactory.getLogger("cluster.lock");
-
-    /**
-     * The semantics of a Node status are as follows:
-     * <ul>
-     * <li>CONNECTED -- a flow controller that is connected to the cluster. A
-     * connecting node transitions to connected after the cluster receives the
-     * flow controller's first heartbeat. A connected node can transition to
-     * disconnecting.</li>
-     * <li>CONNECTING -- a flow controller has issued a connection request to
-     * the cluster, but has not yet sent a heartbeat. A connecting node can
-     * transition to disconnecting or connected. The cluster will not accept any
-     * external requests to change the flow while any node is connecting.</li>
-     * <li>DISCONNECTED -- a flow controller that is not connected to the
-     * cluster. A disconnected node can transition to connecting.</li>
-     * <li>DISCONNECTING -- a flow controller that is in the process of
-     * disconnecting from the cluster. A disconnecting node will always
-     * transition to disconnected.</li>
-     * </ul>
-     */
-    public static enum Status {
-
-        CONNECTED,
-        CONNECTING,
-        DISCONNECTED,
-        DISCONNECTING
-    }
-
-    /**
-     * the node's unique identifier
-     */
-    private final NodeIdentifier nodeId;
-
-    /**
-     * the node statue
-     */
-    private Status status;
-
-    /**
-     * the last heartbeat received by from the node
-     */
-    private Heartbeat lastHeartbeat;
-
-    /**
-     * the payload of the last heartbeat received from the node
-     */
-    private HeartbeatPayload lastHeartbeatPayload;
-
-    /**
-     * the last time the connection for this node was requested
-     */
-    private AtomicLong connectionRequestedTimestamp = new AtomicLong(0L);
-
-    /**
-     * a flag to indicate this node was disconnected because of a lack of
-     * heartbeat
-     */
-    private boolean heartbeatDisconnection;
-
-    public Node(final NodeIdentifier id, final Status status) {
-        if (id == null) {
-            throw new IllegalArgumentException("ID may not be null.");
-        } else if (status == null) {
-            throw new IllegalArgumentException("Status may not be null.");
-        }
-        this.nodeId = id;
-        this.status = status;
-    }
-
-    public NodeIdentifier getNodeId() {
-        return nodeId;
-    }
-
-    /**
-     * Returns the last received heartbeat or null if no heartbeat has been set.
-     *
-     * @return a heartbeat or null
-     */
-    public Heartbeat getHeartbeat() {
-        return lastHeartbeat;
-    }
-
-    public HeartbeatPayload getHeartbeatPayload() {
-        return lastHeartbeatPayload;
-    }
-
-    /**
-     * Sets the last heartbeat received.
-     *
-     * @param heartbeat a heartbeat
-     *
-     * @throws ProtocolException if the heartbeat's payload failed unmarshalling
-     */
-    public void setHeartbeat(final Heartbeat heartbeat) throws ProtocolException {
-        this.lastHeartbeat = heartbeat;
-        if (this.lastHeartbeat == null) {
-            this.lastHeartbeatPayload = null;
-        } else {
-            final byte[] payload = lastHeartbeat.getPayload();
-            if (payload == null || payload.length == 0) {
-                this.lastHeartbeatPayload = null;
-            } else {
-                this.lastHeartbeatPayload = HeartbeatPayload.unmarshal(payload);
-            }
-        }
-    }
-
-    /**
-     * Returns the time of the last received connection request for this node.
-     *
-     * @return the time when the connection request for this node was received.
-     */
-    public long getConnectionRequestedTimestamp() {
-        return connectionRequestedTimestamp.get();
-    }
-
-    /**
-     * Sets the time when the connection request for this node was last
-     * received.
-     *
-     * This method is thread-safe and may be called without obtaining any lock.
-     *
-     * @param connectionRequestedTimestamp
-     */
-    public void setConnectionRequestedTimestamp(long connectionRequestedTimestamp) {
-        this.connectionRequestedTimestamp.set(connectionRequestedTimestamp);
-    }
-
-    /**
-     * Returns true if the node was disconnected due to lack of heartbeat; false
-     * otherwise.
-     *
-     * @return true if the node was disconnected due to lack of heartbeat; false
-     * otherwise.
-     */
-    public boolean isHeartbeatDisconnection() {
-        return heartbeatDisconnection;
-    }
-
-    /**
-     * Sets the status to disconnected and flags the node as being disconnected
-     * by lack of heartbeat.
-     */
-    public void setHeartbeatDisconnection() {
-        setStatus(Status.DISCONNECTED);
-        heartbeatDisconnection = true;
-    }
-
-    /**
-     * @return the status
-     */
-    public Status getStatus() {
-        return status;
-    }
-
-    /**
-     * @param status a status
-     */
-    public void setStatus(final Status status) {
-        if (status == null) {
-            throw new IllegalArgumentException("Status may not be null.");
-        }
-        this.status = status;
-        heartbeatDisconnection = false;
-    }
-
-    @Override
-    public Node clone() {
-        final Node clone = new Node(nodeId, status);
-        clone.lastHeartbeat = lastHeartbeat;
-        clone.lastHeartbeatPayload = lastHeartbeatPayload;
-        clone.heartbeatDisconnection = heartbeatDisconnection;
-        clone.connectionRequestedTimestamp = connectionRequestedTimestamp;
-        return clone;
-    }
-
-    @Override
-    public boolean equals(Object obj) {
-        if (obj == null) {
-            return false;
-        }
-        if (getClass() != obj.getClass()) {
-            return false;
-        }
-        final Node other = (Node) obj;
-        if (this.nodeId != other.nodeId && (this.nodeId == null || !this.nodeId.equals(other.nodeId))) {
-            return false;
-        }
-        return true;
-    }
-
-    @Override
-    public int hashCode() {
-        int hash = 7;
-        hash = 53 * hash + (this.nodeId != null ? this.nodeId.hashCode() : 0);
-        return hash;
-    }
-
-    @Override
-    public String toString() {
-        return nodeId.toString();
-    }
-
-    @Override
-    public int compareTo(final Node o) {
-        if (o == null) {
-            return -1;
-        }
-        return getNodeId().getId().compareTo(o.getNodeId().getId());
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/spring/ClusterManagerProtocolServiceLocatorFactoryBean.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/spring/ClusterManagerProtocolServiceLocatorFactoryBean.java b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/spring/ClusterManagerProtocolServiceLocatorFactoryBean.java
deleted file mode 100644
index e26d196..0000000
--- a/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/spring/ClusterManagerProtocolServiceLocatorFactoryBean.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.cluster.spring;
-
-import java.net.InetSocketAddress;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.nifi.cluster.protocol.impl.ClusterServiceDiscovery;
-import org.apache.nifi.cluster.protocol.impl.ClusterServiceLocator;
-import org.apache.nifi.io.socket.multicast.DiscoverableService;
-import org.apache.nifi.io.socket.multicast.DiscoverableServiceImpl;
-import org.apache.nifi.util.FormatUtils;
-import org.apache.nifi.util.NiFiProperties;
-
-import org.springframework.beans.BeansException;
-import org.springframework.beans.factory.DisposableBean;
-import org.springframework.beans.factory.FactoryBean;
-import org.springframework.context.ApplicationContext;
-import org.springframework.context.ApplicationContextAware;
-
-/**
- * Factory bean for creating a singleton ClusterManagerProtocolServiceLocator
- * instance. If the application is configured to act as the cluster manager,
- * then null is always returned as the created instance.
- *
- * The cluster manager protocol service represents the socket endpoint for
- * sending internal socket messages to the cluster manager.
- */
-public class ClusterManagerProtocolServiceLocatorFactoryBean implements FactoryBean, ApplicationContextAware, DisposableBean {
-
-    private ApplicationContext applicationContext;
-
-    private ClusterServiceLocator locator;
-
-    private NiFiProperties properties;
-
-    @Override
-    public Object getObject() throws Exception {
-        /*
-         * If configured for the cluster manager, then the service locator is never used.  
-         */
-        if (properties.isClusterManager()) {
-            return null;
-        } else if (locator == null) {
-
-            if (properties.getClusterProtocolUseMulticast()) {
-
-                // get the service discovery instance
-                final ClusterServiceDiscovery serviceDiscovery = applicationContext.getBean("clusterManagerProtocolServiceDiscovery", ClusterServiceDiscovery.class);
-
-                // create service location configuration
-                final ClusterServiceLocator.AttemptsConfig config = new ClusterServiceLocator.AttemptsConfig();
-                config.setNumAttempts(properties.getClusterProtocolMulticastServiceLocatorAttempts());
-
-                final int delay = (int) FormatUtils.getTimeDuration(properties.getClusterProtocolMulticastServiceLocatorAttemptsDelay(), TimeUnit.SECONDS);
-                config.setTimeBetweenAttempts(delay);
-                config.setTimeBetweenAttempsUnit(TimeUnit.SECONDS);
-
-                locator = new ClusterServiceLocator(serviceDiscovery);
-                locator.setAttemptsConfig(config);
-
-            } else {
-                final String serviceName = applicationContext.getBean("clusterManagerProtocolServiceName", String.class);
-                final InetSocketAddress serviceAddress = properties.getClusterNodeUnicastManagerProtocolAddress();
-                final DiscoverableService service = new DiscoverableServiceImpl(serviceName, serviceAddress);
-                locator = new ClusterServiceLocator(service);
-            }
-
-            // start the locator
-            locator.start();
-
-        }
-        return locator;
-
-    }
-
-    @Override
-    public Class getObjectType() {
-        return ClusterServiceLocator.class;
-    }
-
-    @Override
-    public boolean isSingleton() {
-        return true;
-    }
-
-    @Override
-    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
-        this.applicationContext = applicationContext;
-    }
-
-    @Override
-    public void destroy() throws Exception {
-        if (locator != null && locator.isRunning()) {
-            locator.stop();
-        }
-    }
-
-    public void setProperties(NiFiProperties properties) {
-        this.properties = properties;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/spring/FileBasedClusterNodeFirewallFactoryBean.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/spring/FileBasedClusterNodeFirewallFactoryBean.java b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/spring/FileBasedClusterNodeFirewallFactoryBean.java
deleted file mode 100644
index ef72298..0000000
--- a/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/spring/FileBasedClusterNodeFirewallFactoryBean.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.cluster.spring;
-
-import java.io.File;
-import org.apache.nifi.cluster.firewall.impl.FileBasedClusterNodeFirewall;
-import org.apache.nifi.util.NiFiProperties;
-import org.springframework.beans.factory.FactoryBean;
-
-/**
- * Factory bean for creating a singleton FileBasedClusterNodeFirewall instance.
- */
-public class FileBasedClusterNodeFirewallFactoryBean implements FactoryBean {
-
-    private FileBasedClusterNodeFirewall firewall;
-
-    private NiFiProperties properties;
-
-    @Override
-    public Object getObject() throws Exception {
-        if (firewall == null) {
-            final File config = properties.getClusterManagerNodeFirewallFile();
-            final File restoreDirectory = properties.getRestoreDirectory();
-            if (config != null) {
-                firewall = new FileBasedClusterNodeFirewall(config, restoreDirectory);
-            }
-        }
-        return firewall;
-    }
-
-    @Override
-    public Class getObjectType() {
-        return FileBasedClusterNodeFirewall.class;
-    }
-
-    @Override
-    public boolean isSingleton() {
-        return true;
-    }
-
-    public void setProperties(NiFiProperties properties) {
-        this.properties = properties;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/spring/WebClusterManagerFactoryBean.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/spring/WebClusterManagerFactoryBean.java b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/spring/WebClusterManagerFactoryBean.java
deleted file mode 100644
index 7169730..0000000
--- a/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/spring/WebClusterManagerFactoryBean.java
+++ /dev/null
@@ -1,139 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.cluster.spring;
-
-import java.nio.file.Paths;
-import org.apache.nifi.admin.service.AuditService;
-import org.apache.nifi.cluster.event.EventManager;
-import org.apache.nifi.cluster.firewall.ClusterNodeFirewall;
-import org.apache.nifi.cluster.flow.DataFlowManagementService;
-import org.apache.nifi.cluster.manager.HttpRequestReplicator;
-import org.apache.nifi.cluster.manager.HttpResponseMapper;
-import org.apache.nifi.cluster.manager.impl.WebClusterManager;
-import org.apache.nifi.cluster.protocol.impl.ClusterManagerProtocolSenderListener;
-import org.apache.nifi.cluster.protocol.impl.ClusterServicesBroadcaster;
-import org.apache.nifi.controller.service.ControllerServiceLoader;
-import org.apache.nifi.encrypt.StringEncryptor;
-import org.apache.nifi.io.socket.multicast.DiscoverableService;
-import org.apache.nifi.io.socket.multicast.DiscoverableServiceImpl;
-import org.apache.nifi.util.NiFiProperties;
-import org.springframework.beans.BeansException;
-import org.springframework.beans.factory.FactoryBean;
-import org.springframework.context.ApplicationContext;
-import org.springframework.context.ApplicationContextAware;
-
-/**
- * Factory bean for creating a singleton WebClusterManager instance. If the
- * application is not configured to act as the cluster manager, then null is
- * always returned as the created instance.
- */
-public class WebClusterManagerFactoryBean implements FactoryBean, ApplicationContextAware {
-
-    private ApplicationContext applicationContext;
-
-    private WebClusterManager clusterManager;
-
-    private NiFiProperties properties;
-
-    private StringEncryptor encryptor;
-
-    @Override
-    public Object getObject() throws Exception {
-        if (properties.isClusterManager() && properties.isNode()) {
-            throw new IllegalStateException("Application may be configured as a cluster manager or a node, but not both.");
-        } else if (!properties.isClusterManager()) {
-            /*
-             * If not configured for the cluster manager, then the cluster manager is never used.  
-             * null is returned so that we don't instantiate a thread pool or other resources. 
-             */
-            return null;
-        } else if (clusterManager == null) {
-
-            // get the service configuration path (fail early)
-            final String serviceConfigurationFile = properties.getProperty(NiFiProperties.SERVICE_CONFIGURATION_FILE);
-            if (serviceConfigurationFile == null) {
-                throw new NullPointerException("The service configuration file has not been specified.");
-            }
-
-            final HttpRequestReplicator requestReplicator = applicationContext.getBean("httpRequestReplicator", HttpRequestReplicator.class);
-            final HttpResponseMapper responseMapper = applicationContext.getBean("httpResponseMapper", HttpResponseMapper.class);
-            final DataFlowManagementService dataFlowService = applicationContext.getBean("dataFlowManagementService", DataFlowManagementService.class);
-            final ClusterManagerProtocolSenderListener senderListener = applicationContext.getBean("clusterManagerProtocolSenderListener", ClusterManagerProtocolSenderListener.class);
-
-            // create the manager
-            clusterManager = new WebClusterManager(
-                    requestReplicator,
-                    responseMapper,
-                    dataFlowService,
-                    senderListener,
-                    properties,
-                    encryptor
-            );
-
-            // set the service broadcaster
-            if (properties.getClusterProtocolUseMulticast()) {
-
-                // create broadcaster
-                final ClusterServicesBroadcaster broadcaster = applicationContext.getBean("clusterServicesBroadcaster", ClusterServicesBroadcaster.class);
-
-                // register the cluster manager protocol service
-                final String clusterManagerProtocolServiceName = applicationContext.getBean("clusterManagerProtocolServiceName", String.class);
-                final DiscoverableService clusterManagerProtocolService = new DiscoverableServiceImpl(clusterManagerProtocolServiceName, properties.getClusterManagerProtocolAddress());
-                broadcaster.addService(clusterManagerProtocolService);
-
-                clusterManager.setServicesBroadcaster(broadcaster);
-            }
-
-            // set the event manager
-            clusterManager.setEventManager(applicationContext.getBean("nodeEventHistoryManager", EventManager.class));
-
-            // set the cluster firewall
-            clusterManager.setClusterFirewall(applicationContext.getBean("clusterFirewall", ClusterNodeFirewall.class));
-
-            // set the audit service
-            clusterManager.setAuditService(applicationContext.getBean("auditService", AuditService.class));
-
-            // load the controller services
-            final ControllerServiceLoader serviceLoader = new ControllerServiceLoader(Paths.get(serviceConfigurationFile));
-            serviceLoader.loadControllerServices(clusterManager);
-        }
-        return clusterManager;
-    }
-
-    @Override
-    public Class getObjectType() {
-        return WebClusterManager.class;
-    }
-
-    @Override
-    public boolean isSingleton() {
-        return true;
-    }
-
-    @Override
-    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
-        this.applicationContext = applicationContext;
-    }
-
-    public void setProperties(NiFiProperties properties) {
-        this.properties = properties;
-    }
-
-    public void setEncryptor(final StringEncryptor encryptor) {
-        this.encryptor = encryptor;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/controller/reporting/ClusteredReportingTaskNode.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/controller/reporting/ClusteredReportingTaskNode.java b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/controller/reporting/ClusteredReportingTaskNode.java
deleted file mode 100644
index 1ed5b30..0000000
--- a/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/controller/reporting/ClusteredReportingTaskNode.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.controller.reporting;
-
-import org.apache.nifi.cluster.manager.impl.ClusteredReportingContext;
-import org.apache.nifi.controller.ProcessScheduler;
-import org.apache.nifi.controller.ValidationContextFactory;
-import org.apache.nifi.controller.service.ControllerServiceProvider;
-import org.apache.nifi.reporting.BulletinRepository;
-import org.apache.nifi.reporting.EventAccess;
-import org.apache.nifi.reporting.ReportingContext;
-import org.apache.nifi.reporting.ReportingTask;
-
-public class ClusteredReportingTaskNode extends AbstractReportingTaskNode {
-
-    private final EventAccess eventAccess;
-    private final BulletinRepository bulletinRepository;
-    private final ControllerServiceProvider serviceProvider;
-
-    public ClusteredReportingTaskNode(final ReportingTask reportingTask, final String id, final ProcessScheduler scheduler,
-            final EventAccess eventAccess, final BulletinRepository bulletinRepository, final ControllerServiceProvider serviceProvider,
-            final ValidationContextFactory validationContextFactory) {
-        super(reportingTask, id, serviceProvider, scheduler, validationContextFactory);
-
-        this.eventAccess = eventAccess;
-        this.bulletinRepository = bulletinRepository;
-        this.serviceProvider = serviceProvider;
-    }
-
-    @Override
-    public ReportingContext getReportingContext() {
-        return new ClusteredReportingContext(eventAccess, bulletinRepository, getProperties(), serviceProvider);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/nar-bundles/framework-bundle/framework/cluster/src/main/resources/nifi-cluster-manager-context.xml
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/cluster/src/main/resources/nifi-cluster-manager-context.xml b/nar-bundles/framework-bundle/framework/cluster/src/main/resources/nifi-cluster-manager-context.xml
deleted file mode 100644
index 68c29bc..0000000
--- a/nar-bundles/framework-bundle/framework/cluster/src/main/resources/nifi-cluster-manager-context.xml
+++ /dev/null
@@ -1,124 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-  Licensed to the Apache Software Foundation (ASF) under one or more
-  contributor license agreements.  See the NOTICE file distributed with
-  this work for additional information regarding copyright ownership.
-  The ASF licenses this file to You under the Apache License, Version 2.0
-  (the "License"); you may not use this file except in compliance with
-  the License.  You may obtain a copy of the License at
-      http://www.apache.org/licenses/LICENSE-2.0
-  Unless required by applicable law or agreed to in writing, software
-  distributed under the License is distributed on an "AS IS" BASIS,
-  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  See the License for the specific language governing permissions and
-  limitations under the License.
--->
-<!-- marked as lazy so that clustering beans are not created when applications runs in non-clustered mode -->
-<beans default-lazy-init="true"
-       xmlns="http://www.springframework.org/schema/beans"
-       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-       xmlns:util="http://www.springframework.org/schema/util"
-       xmlns:context="http://www.springframework.org/schema/context"
-       xmlns:aop="http://www.springframework.org/schema/aop"
-       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.1.xsd
-    http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-3.1.xsd
-    http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.1.xsd
-    http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.1.xsd">
-
-    <!-- jersey client -->
-    <bean id="jersey-client" class="org.apache.nifi.web.util.WebUtils" factory-method="createClient">
-        <constructor-arg>
-            <bean class="com.sun.jersey.api.client.config.DefaultClientConfig"/>
-        </constructor-arg>
-        <constructor-arg>
-            <bean class="org.apache.nifi.framework.security.util.SslContextFactory" factory-method="createSslContext">
-                <constructor-arg ref="nifiProperties"/>
-            </bean>
-        </constructor-arg>
-    </bean>
-
-    <!-- http request replicator -->
-    <bean id="httpRequestReplicator" class="org.apache.nifi.cluster.manager.impl.HttpRequestReplicatorImpl">
-        <constructor-arg index="0">
-            <bean factory-bean="nifiProperties" factory-method="getClusterManagerNodeApiRequestThreads"/>
-        </constructor-arg>
-        <constructor-arg ref="jersey-client" index="1"/>
-        <constructor-arg index="2">
-            <bean factory-bean="nifiProperties" factory-method="getClusterManagerNodeApiConnectionTimeout"/>
-        </constructor-arg>
-        <constructor-arg index="3">
-            <bean factory-bean="nifiProperties" factory-method="getClusterManagerNodeApiReadTimeout"/>
-        </constructor-arg>
-        <property name="nodeProtocolScheme">
-            <bean factory-bean="nifiProperties" factory-method="getClusterProtocolManagerToNodeApiScheme"/>
-        </property>
-    </bean>
-    
-    <!-- http response mapper -->
-    <bean id="httpResponseMapper" class="org.apache.nifi.cluster.manager.impl.HttpResponseMapperImpl"/>
-
-    <!-- cluster flow DAO -->
-    <bean id="dataFlowDao" class="org.apache.nifi.cluster.flow.impl.DataFlowDaoImpl">
-        <constructor-arg index="0">
-            <bean factory-bean="nifiProperties" factory-method="getFlowConfigurationFileDir"/>
-        </constructor-arg>
-        <constructor-arg index="1">
-            <bean factory-bean="nifiProperties" factory-method="getRestoreDirectory"/>
-        </constructor-arg>
-        <constructor-arg index="2">
-            <bean factory-bean="nifiProperties" factory-method="getAutoResumeState"/>
-        </constructor-arg>
-    </bean>
-    
-    <!-- dataflow management service -->
-    <bean id="dataFlowManagementService" class="org.apache.nifi.cluster.flow.impl.DataFlowManagementServiceImpl">
-        <constructor-arg ref="dataFlowDao"/>
-        <constructor-arg ref="clusterManagerProtocolSender"/>
-        <property name="retrievalDelay">
-            <bean factory-bean="nifiProperties" factory-method="getClusterManagerFlowRetrievalDelay"/>
-        </property>
-    </bean>
-
-    <!-- node event history manager -->
-    <bean id="nodeEventHistoryManager" class="org.apache.nifi.cluster.event.impl.EventManagerImpl">
-        <constructor-arg>
-            <bean factory-bean="nifiProperties" factory-method="getClusterManagerNodeEventHistorySize"/>
-        </constructor-arg>
-    </bean>
-
-    <!-- cluster firewall -->
-    <bean id="clusterFirewall" class="org.apache.nifi.cluster.spring.FileBasedClusterNodeFirewallFactoryBean">
-        <property name="properties" ref="nifiProperties"/>
-    </bean>
-
-    <!-- cluster manager -->
-    <bean id="clusterManager" class="org.apache.nifi.cluster.spring.WebClusterManagerFactoryBean">
-        <property name="properties" ref="nifiProperties"/>
-        <property name="encryptor" ref="stringEncryptor"/>
-    </bean>
-    
-    <!-- discoverable services -->
-    
-    <!-- cluster manager protocol discoverable service -->
-    
-    <!-- service name for communicating with the cluster manager using sockets -->
-    <bean id="clusterManagerProtocolServiceName" class="java.lang.String">
-        <constructor-arg value="cluster-manager-protocol" />
-    </bean>
-    
-    <!-- cluster manager protocol service discovery -->
-    <bean id="clusterManagerProtocolServiceDiscovery" class="org.apache.nifi.cluster.protocol.impl.ClusterServiceDiscovery">
-        <constructor-arg ref="clusterManagerProtocolServiceName" index="0"/>
-        <constructor-arg index="1">
-            <bean factory-bean="nifiProperties" factory-method="getClusterProtocolMulticastAddress"/>
-        </constructor-arg>
-        <constructor-arg ref="protocolMulticastConfiguration" index="2"/>
-        <constructor-arg ref="protocolContext" index="3"/>
-    </bean>
-    
-    <!-- cluster manager protocol service locator -->
-    <bean id="clusterManagerProtocolServiceLocator" class="org.apache.nifi.cluster.spring.ClusterManagerProtocolServiceLocatorFactoryBean">
-        <property name="properties" ref="nifiProperties"/>
-    </bean>
-        
-</beans>

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/event/impl/EventManagerImplTest.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/event/impl/EventManagerImplTest.java b/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/event/impl/EventManagerImplTest.java
deleted file mode 100644
index 09ea44b..0000000
--- a/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/event/impl/EventManagerImplTest.java
+++ /dev/null
@@ -1,119 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.cluster.event.impl;
-
-import org.apache.nifi.cluster.event.impl.EventManagerImpl;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-import org.apache.nifi.cluster.event.Event;
-import org.apache.nifi.cluster.event.Event.Category;
-import org.apache.nifi.cluster.event.EventManager;
-import org.junit.Test;
-import static org.junit.Assert.*;
-
-/**
- * @author unattributed
- */
-public class EventManagerImplTest {
-
-    @Test(expected = IllegalArgumentException.class)
-    public void testNonPositiveHistorySize() {
-        new EventManagerImpl(0);
-    }
-
-    @Test
-    public void testGetEventsUnknownSource() {
-        EventManager manager = new EventManagerImpl(1);
-        assertEquals(Collections.EMPTY_LIST, manager.getEvents("unknown value"));
-    }
-
-    @Test
-    public void testGetEvents() {
-
-        EventManager manager = new EventManagerImpl(2);
-
-        Event e1 = new Event("1", "Event1", Category.INFO, 0);
-        Event e2 = new Event("1", "Event2", Category.INFO, 1);
-
-        manager.addEvent(e1);
-        manager.addEvent(e2);
-
-        List<Event> events = manager.getEvents("1");
-
-        // assert newest to oldest
-        assertEquals(Arrays.asList(e2, e1), events);
-    }
-
-    @Test
-    public void testGetMostRecentEventUnknownSource() {
-        EventManager manager = new EventManagerImpl(1);
-        assertNull(manager.getMostRecentEvent("unknown value"));
-    }
-
-    @Test
-    public void testGetMostRecentEvent() {
-
-        EventManager manager = new EventManagerImpl(2);
-
-        Event e1 = new Event("1", "Event1", Category.INFO, 0);
-        Event e2 = new Event("1", "Event2", Category.INFO, 1);
-
-        manager.addEvent(e1);
-        manager.addEvent(e2);
-
-        // assert newest to oldest
-        assertEquals(e2, manager.getMostRecentEvent("1"));
-    }
-
-    @Test
-    public void testAddEventExceedsHistorySize() {
-
-        EventManager manager = new EventManagerImpl(1);
-
-        Event e1 = new Event("1", "Event1", Category.INFO, 0);
-        Event e2 = new Event("1", "Event2", Category.INFO, 1);
-
-        manager.addEvent(e1);
-        manager.addEvent(e2);
-
-        List<Event> events = manager.getEvents("1");
-
-        // assert oldest evicted
-        assertEquals(Arrays.asList(e2), events);
-
-    }
-
-    @Test
-    public void testClearHistory() {
-
-        EventManager manager = new EventManagerImpl(1);
-
-        Event e1 = new Event("1", "Event1", Category.INFO, 0);
-        Event e2 = new Event("1", "Event2", Category.INFO, 1);
-
-        manager.addEvent(e1);
-        manager.addEvent(e2);
-
-        manager.clearEventHistory("1");
-
-        // assert oldest evicted
-        assertTrue(manager.getEvents("1").isEmpty());
-
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/firewall/impl/FileBasedClusterNodeFirewallTest.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/firewall/impl/FileBasedClusterNodeFirewallTest.java b/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/firewall/impl/FileBasedClusterNodeFirewallTest.java
deleted file mode 100644
index e5db7ca..0000000
--- a/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/firewall/impl/FileBasedClusterNodeFirewallTest.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.cluster.firewall.impl;
-
-import java.io.File;
-import java.io.IOException;
-import org.apache.nifi.util.file.FileUtils;
-import org.junit.After;
-import org.junit.Before;
-import static org.junit.Assert.*;
-import org.junit.Test;
-
-public class FileBasedClusterNodeFirewallTest {
-
-    private FileBasedClusterNodeFirewall ipsFirewall;
-
-    private FileBasedClusterNodeFirewall acceptAllFirewall;
-
-    private File ipsConfig;
-
-    private File emptyConfig;
-
-    private File restoreDirectory;
-
-    @Before
-    public void setup() throws Exception {
-
-        ipsConfig = new File("src/test/resources/org/apache/nifi/cluster/firewall/impl/ips.txt");
-        emptyConfig = new File("src/test/resources/org/apache/nifi/cluster/firewall/impl/empty.txt");
-
-        restoreDirectory = new File(System.getProperty("java.io.tmpdir") + "/firewall_restore");
-
-        ipsFirewall = new FileBasedClusterNodeFirewall(ipsConfig, restoreDirectory);
-        acceptAllFirewall = new FileBasedClusterNodeFirewall(emptyConfig);
-    }
-
-    @After
-    public void teardown() throws IOException {
-        deleteFile(restoreDirectory);
-    }
-
-    @Test
-    public void testSyncWithRestore() {
-        assertEquals(ipsConfig.length(), new File(restoreDirectory, ipsConfig.getName()).length());
-    }
-
-    @Test
-    public void testIsPermissibleWithExactMatch() {
-        assertTrue(ipsFirewall.isPermissible("2.2.2.2"));
-    }
-
-    @Test
-    public void testIsPermissibleWithSubnetMatch() {
-        assertTrue(ipsFirewall.isPermissible("3.3.3.255"));
-    }
-
-    @Test
-    public void testIsPermissibleWithNoMatch() {
-        assertFalse(ipsFirewall.isPermissible("255.255.255.255"));
-    }
-
-    @Test
-    public void testIsPermissibleWithMalformedData() {
-        assertFalse(ipsFirewall.isPermissible("abc"));
-    }
-
-    @Test
-    public void testIsPermissibleWithEmptyConfig() {
-        assertTrue(acceptAllFirewall.isPermissible("1.1.1.1"));
-    }
-
-    @Test
-    public void testIsPermissibleWithEmptyConfigWithMalformedData() {
-        assertTrue(acceptAllFirewall.isPermissible("abc"));
-    }
-
-    private boolean deleteFile(final File file) {
-        if (file.isDirectory()) {
-            FileUtils.deleteFilesInDir(file, null, null, true, true);
-        }
-        return FileUtils.deleteFile(file, null, 10);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/flow/impl/DataFlowManagementServiceImplTest.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/flow/impl/DataFlowManagementServiceImplTest.java b/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/flow/impl/DataFlowManagementServiceImplTest.java
deleted file mode 100644
index f9ba016..0000000
--- a/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/flow/impl/DataFlowManagementServiceImplTest.java
+++ /dev/null
@@ -1,343 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.cluster.flow.impl;
-
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-
-import java.io.ByteArrayInputStream;
-import java.io.File;
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.Set;
-
-import javax.xml.parsers.DocumentBuilder;
-import javax.xml.parsers.DocumentBuilderFactory;
-import javax.xml.parsers.ParserConfigurationException;
-
-import org.apache.nifi.cluster.flow.DataFlowDao;
-import org.apache.nifi.cluster.flow.PersistedFlowState;
-import org.apache.nifi.cluster.protocol.ClusterManagerProtocolSender;
-import org.apache.nifi.cluster.protocol.NodeIdentifier;
-import org.apache.nifi.cluster.protocol.ProtocolContext;
-import org.apache.nifi.cluster.protocol.ProtocolException;
-import org.apache.nifi.cluster.protocol.ProtocolHandler;
-import org.apache.nifi.cluster.protocol.StandardDataFlow;
-import org.apache.nifi.cluster.protocol.impl.ClusterManagerProtocolSenderImpl;
-import org.apache.nifi.cluster.protocol.impl.SocketProtocolListener;
-import org.apache.nifi.cluster.protocol.jaxb.JaxbProtocolContext;
-import org.apache.nifi.cluster.protocol.jaxb.message.JaxbProtocolUtils;
-import org.apache.nifi.cluster.protocol.message.FlowResponseMessage;
-import org.apache.nifi.cluster.protocol.message.ProtocolMessage;
-import org.apache.nifi.io.socket.ServerSocketConfiguration;
-import org.apache.nifi.io.socket.SocketConfiguration;
-
-import org.apache.commons.io.FileUtils;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.w3c.dom.Document;
-import org.w3c.dom.Element;
-import org.xml.sax.SAXException;
-
-/**
- * @author unattributed
- */
-public class DataFlowManagementServiceImplTest {
-
-    private DataFlowManagementServiceImpl service;
-    private File restoreLocation;
-    private File primaryLocation;
-    private DataFlowDao dao;
-    private int apiDummyPort;
-    private int socketPort;
-    private SocketConfiguration socketConfig;
-    private ClusterManagerProtocolSender sender;
-    private ServerSocketConfiguration serverSocketConfig;
-    private SocketProtocolListener listener;
-
-    @Before
-    public void setup() throws IOException {
-
-        primaryLocation = new File(System.getProperty("java.io.tmpdir") + "/primary" + this.getClass().getSimpleName());
-        restoreLocation = new File(System.getProperty("java.io.tmpdir") + "/restore" + this.getClass().getSimpleName());
-
-        FileUtils.deleteDirectory(primaryLocation);
-        FileUtils.deleteDirectory(restoreLocation);
-
-        ProtocolContext protocolContext = new JaxbProtocolContext(JaxbProtocolUtils.JAXB_CONTEXT);
-
-        socketConfig = new SocketConfiguration();
-        socketConfig.setSocketTimeout(1000);
-        serverSocketConfig = new ServerSocketConfiguration();
-
-        dao = new DataFlowDaoImpl(primaryLocation, restoreLocation, false);
-
-        sender = new ClusterManagerProtocolSenderImpl(socketConfig, protocolContext);
-
-        service = new DataFlowManagementServiceImpl(dao, sender);
-        service.start();
-
-        listener = new SocketProtocolListener(1, 0, serverSocketConfig, protocolContext);
-        listener.start();
-
-        apiDummyPort = 7777;
-        socketPort = listener.getPort();
-    }
-
-    @After
-    public void teardown() throws IOException {
-
-        if (service != null && service.isRunning()) {
-            service.stop();
-        }
-
-        if (listener != null && listener.isRunning()) {
-            try {
-                listener.stop();
-            } catch (final Exception ex) {
-                ex.printStackTrace(System.out);
-            }
-        }
-        FileUtils.deleteDirectory(primaryLocation);
-        FileUtils.deleteDirectory(restoreLocation);
-
-    }
-
-    @Test
-    public void testLoadFlowWithNonExistentFlow() throws ParserConfigurationException, SAXException, IOException {
-        verifyFlow();
-    }
-
-    @Test
-    public void testLoadFlowWithNonExistentFlowWhenServiceStopped() throws IOException, SAXException, ParserConfigurationException {
-        service.stop();
-        verifyFlow();
-    }
-
-    private void verifyFlow() throws ParserConfigurationException, SAXException, IOException {
-        final byte[] flowBytes = service.loadDataFlow().getDataFlow().getFlow();
-        final DocumentBuilder docBuilder = DocumentBuilderFactory.newInstance().newDocumentBuilder();
-        final Document doc = docBuilder.parse(new ByteArrayInputStream(flowBytes));
-        final Element controller = (Element) doc.getElementsByTagName("flowController").item(0);
-        final Element rootGroup = (Element) controller.getElementsByTagName("rootGroup").item(0);
-        final String rootGroupName = rootGroup.getElementsByTagName("name").item(0).getTextContent();
-        assertEquals("NiFi Flow", rootGroupName);
-    }
-
-    @Test
-    public void testLoadFlowSingleNode() throws Exception {
-        String flowStr = "<rootGroup />";
-        byte[] flowBytes = flowStr.getBytes();
-        listener.addHandler(new FlowRequestProtocolHandler(new StandardDataFlow(flowStr.getBytes("UTF-8"), new byte[0], new byte[0])));
-
-        NodeIdentifier nodeId = new NodeIdentifier("1", "localhost", apiDummyPort, "localhost", socketPort);
-        service.setNodeIds(new HashSet<>(Arrays.asList(nodeId)));
-        service.setPersistedFlowState(PersistedFlowState.STALE);
-
-        assertEquals(PersistedFlowState.STALE, service.getPersistedFlowState());
-
-        // sleep long enough for the flow retriever to run
-        waitForState(PersistedFlowState.CURRENT);
-
-        assertEquals(PersistedFlowState.CURRENT, service.getPersistedFlowState());
-        assertArrayEquals(flowBytes, service.loadDataFlow().getDataFlow().getFlow());
-
-    }
-
-    @Test
-    public void testLoadFlowWithSameNodeIds() throws Exception {
-
-        String flowStr = "<rootGroup />";
-        listener.addHandler(new FlowRequestProtocolHandler(new StandardDataFlow(flowStr.getBytes("UTF-8"), new byte[0], new byte[0])));
-
-        NodeIdentifier nodeId1 = new NodeIdentifier("1", "localhost", apiDummyPort, "localhost", socketPort);
-        NodeIdentifier nodeId2 = new NodeIdentifier("2", "localhost", apiDummyPort, "localhost", socketPort);
-        service.setNodeIds(new HashSet<>(Arrays.asList(nodeId1, nodeId2)));
-        service.setPersistedFlowState(PersistedFlowState.STALE);
-
-        assertEquals(PersistedFlowState.STALE, service.getPersistedFlowState());
-
-        // sleep long enough for the flow retriever to run
-        waitForState(PersistedFlowState.CURRENT);
-
-        // verify that flow is current
-        assertEquals(PersistedFlowState.CURRENT, service.getPersistedFlowState());
-
-        // add same ids in different order
-        service.setNodeIds(new HashSet<>(Arrays.asList(nodeId2, nodeId1)));
-
-        // verify flow is still current
-        assertEquals(PersistedFlowState.CURRENT, service.getPersistedFlowState());
-
-    }
-
-    @Test
-    public void testLoadFlowWithABadNode() throws Exception {
-
-        String flowStr = "<rootGroup />";
-        byte[] flowBytes = flowStr.getBytes();
-        listener.addHandler(new FlowRequestProtocolHandler(new StandardDataFlow(flowStr.getBytes("UTF-8"), new byte[0], new byte[0])));
-
-        NodeIdentifier nodeId1 = new NodeIdentifier("1", "localhost", apiDummyPort, "localhost", socketPort + 1);
-        NodeIdentifier nodeId2 = new NodeIdentifier("2", "localhost", apiDummyPort, "localhost", socketPort);
-        service.setNodeIds(new HashSet<>(Arrays.asList(nodeId1, nodeId2)));
-        service.setPersistedFlowState(PersistedFlowState.STALE);
-
-        assertEquals(PersistedFlowState.STALE, service.getPersistedFlowState());
-
-        // sleep long enough for the flow retriever to run
-        waitForState(PersistedFlowState.CURRENT);
-
-        assertEquals(PersistedFlowState.CURRENT, service.getPersistedFlowState());
-        assertArrayEquals(flowBytes, service.loadDataFlow().getDataFlow().getFlow());
-
-    }
-
-    @Test
-    public void testLoadFlowWithConstantNodeIdChanging() throws Exception {
-        String flowStr = "<rootGroup />";
-        byte[] flowBytes = flowStr.getBytes();
-        listener.addHandler(new FlowRequestProtocolHandler(new StandardDataFlow(flowStr.getBytes("UTF-8"), new byte[0], new byte[0])));
-
-        NodeIdentifier nodeId1 = new NodeIdentifier("1", "localhost", apiDummyPort, "localhost", socketPort + 1);
-        NodeIdentifier nodeId2 = new NodeIdentifier("2", "localhost", apiDummyPort, "localhost", socketPort);
-
-        for (int i = 0; i < 1000; i++) {
-            service.setNodeIds(new HashSet<>(Arrays.asList(nodeId1, nodeId2)));
-            service.setPersistedFlowState(PersistedFlowState.STALE);
-            assertEquals(PersistedFlowState.STALE, service.getPersistedFlowState());
-        }
-
-        // sleep long enough for the flow retriever to run
-        waitForState(PersistedFlowState.CURRENT);
-
-        assertEquals(PersistedFlowState.CURRENT, service.getPersistedFlowState());
-        assertArrayEquals(flowBytes, service.loadDataFlow().getDataFlow().getFlow());
-    }
-
-    @Test
-    public void testLoadFlowWithConstantNodeIdChangingWithRetrievalDelay() throws Exception {
-
-        String flowStr = "<rootGroup />";
-        listener.addHandler(new FlowRequestProtocolHandler(new StandardDataFlow(flowStr.getBytes("UTF-8"), new byte[0], new byte[0])));
-
-        NodeIdentifier nodeId1 = new NodeIdentifier("1", "localhost", apiDummyPort, "localhost", socketPort + 1);
-        NodeIdentifier nodeId2 = new NodeIdentifier("2", "localhost", apiDummyPort, "localhost", socketPort);
-
-        service.setRetrievalDelay("5 sec");
-        for (int i = 0; i < 1000; i++) {
-            service.setNodeIds(new HashSet<>(Arrays.asList(nodeId1, nodeId2)));
-            service.setPersistedFlowState(PersistedFlowState.STALE);
-            assertEquals(PersistedFlowState.STALE, service.getPersistedFlowState());
-        }
-
-        // sleep long enough for the flow retriever to run
-        waitForState(PersistedFlowState.STALE);
-
-        assertEquals(PersistedFlowState.STALE, service.getPersistedFlowState());
-
-    }
-
-    @Test
-    public void testStopRequestedWhileRetrieving() throws Exception {
-
-        String flowStr = "<rootGroup />";
-        listener.addHandler(new FlowRequestProtocolHandler(new StandardDataFlow(flowStr.getBytes("UTF-8"), new byte[0], new byte[0])));
-        Set<NodeIdentifier> nodeIds = new HashSet<>();
-        for (int i = 0; i < 1000; i++) {
-            nodeIds.add(new NodeIdentifier("1", "localhost", apiDummyPort, "localhost", socketPort + 1));
-        }
-        nodeIds.add(new NodeIdentifier("1", "localhost", apiDummyPort, "localhost", socketPort));
-
-        long lastRetrievalTime = service.getLastRetrievalTime();
-
-        service.setNodeIds(nodeIds);
-        service.setPersistedFlowState(PersistedFlowState.STALE);
-        assertEquals(PersistedFlowState.STALE, service.getPersistedFlowState());
-
-        // sleep long enough for the flow retriever to run
-        waitForState(PersistedFlowState.STALE);
-
-        service.stop();
-
-        service.setPersistedFlowState(PersistedFlowState.STALE);
-        assertEquals(PersistedFlowState.STALE, service.getPersistedFlowState());
-
-        assertEquals(lastRetrievalTime, service.getLastRetrievalTime());
-
-    }
-
-    @Test
-    public void testLoadFlowUnknownState() throws Exception {
-
-        String flowStr = "<rootGroup />";
-        byte[] flowBytes = flowStr.getBytes();
-        listener.addHandler(new FlowRequestProtocolHandler(new StandardDataFlow(flowStr.getBytes("UTF-8"), new byte[0], new byte[0])));
-        NodeIdentifier nodeId = new NodeIdentifier("1", "localhost", apiDummyPort, "localhost", socketPort);
-
-        service.setNodeIds(new HashSet<>(Arrays.asList(nodeId)));
-        service.setPersistedFlowState(PersistedFlowState.STALE);
-        assertEquals(PersistedFlowState.STALE, service.getPersistedFlowState());
-
-        service.setPersistedFlowState(PersistedFlowState.UNKNOWN);
-
-        assertEquals(PersistedFlowState.UNKNOWN, service.getPersistedFlowState());
-
-        service.setPersistedFlowState(PersistedFlowState.STALE);
-        assertEquals(PersistedFlowState.STALE, service.getPersistedFlowState());
-
-        // sleep long enough for the flow retriever to run
-        waitForState(PersistedFlowState.CURRENT);
-
-        assertArrayEquals(flowBytes, service.loadDataFlow().getDataFlow().getFlow());
-
-    }
-
-    private class FlowRequestProtocolHandler implements ProtocolHandler {
-
-        private StandardDataFlow dataFlow;
-
-        public FlowRequestProtocolHandler(final StandardDataFlow dataFlow) {
-            this.dataFlow = dataFlow;
-        }
-
-        @Override
-        public boolean canHandle(ProtocolMessage msg) {
-            return true;
-        }
-
-        @Override
-        public ProtocolMessage handle(ProtocolMessage msg) throws ProtocolException {
-            FlowResponseMessage response = new FlowResponseMessage();
-            response.setDataFlow(dataFlow);
-            return response;
-        }
-
-    }
-
-    private void waitForState(PersistedFlowState state) throws InterruptedException {
-        for (int i = 0; i < 30; i++) {
-            if (service.getPersistedFlowState() == state) {
-                break;
-            } else {
-                Thread.sleep(1000);
-            }
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/manager/impl/HttpRequestReplicatorImplTest.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/manager/impl/HttpRequestReplicatorImplTest.java b/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/manager/impl/HttpRequestReplicatorImplTest.java
deleted file mode 100644
index 0c65aba..0000000
--- a/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/manager/impl/HttpRequestReplicatorImplTest.java
+++ /dev/null
@@ -1,368 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.cluster.manager.impl;
-
-import org.apache.nifi.cluster.manager.impl.HttpRequestReplicatorImpl;
-import javax.ws.rs.core.Response;
-import javax.xml.bind.annotation.XmlRootElement;
-import javax.ws.rs.HttpMethod;
-import javax.ws.rs.core.MultivaluedMap;
-import java.net.URISyntaxException;
-import java.util.Arrays;
-import java.util.Iterator;
-import javax.ws.rs.core.StreamingOutput;
-import org.apache.nifi.cluster.manager.testutils.HttpResponse;
-import org.apache.nifi.cluster.manager.testutils.HttpServer;
-import com.sun.jersey.api.client.Client;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.net.URI;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import javax.ws.rs.core.Response.Status;
-import org.apache.nifi.cluster.manager.NodeResponse;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.apache.nifi.cluster.manager.testutils.HttpResponseAction;
-import org.apache.nifi.cluster.protocol.NodeIdentifier;
-import static org.junit.Assert.*;
-
-/**
- * @author unattributed
- */
-public class HttpRequestReplicatorImplTest {
-
-    private Client client;
-    private HttpRequestReplicatorImpl replicator;
-    private int executorThreadCount;
-    private int serverThreadCount;
-    private int serverPort;
-    private HttpServer server;
-    private Map<String, List<String>> expectedRequestParameters;
-    private Map<String, String> expectedRequestHeaders;
-    private Map<String, String> expectedResponseHeaders;
-    private Object expectedEntity;
-    private String expectedBody;
-    private URI prototypeUri;
-
-    @Before
-    public void setUp() throws IOException, URISyntaxException {
-
-        executorThreadCount = 5;
-        serverThreadCount = 3;
-
-        client = Client.create();
-
-        replicator = new HttpRequestReplicatorImpl(executorThreadCount, client, "1 sec", "1 sec");
-        replicator.start();
-
-        expectedRequestHeaders = new HashMap<>();
-        expectedRequestHeaders.put("header1", "header value1");
-        expectedRequestHeaders.put("header2", "header value2");
-
-        expectedRequestParameters = new HashMap<>();
-        expectedRequestParameters.put("param1", Arrays.asList("p value1"));
-        expectedRequestParameters.put("param2", Arrays.asList("p value2"));
-
-        expectedResponseHeaders = new HashMap<>();
-        expectedResponseHeaders.put("header1", "header value1");
-        expectedResponseHeaders.put("header2", "header value2");
-
-        expectedEntity = new Entity();
-
-        expectedBody = "some text";
-
-        prototypeUri = new URI("http://prototype.host/path/to/resource");
-
-        server = new HttpServer(serverThreadCount, 0);
-        server.start();
-        serverPort = server.getPort();
-    }
-
-    @After
-    public void teardown() {
-        if (server.isRunning()) {
-            server.stop();
-        }
-        if (replicator.isRunning()) {
-            replicator.stop();
-        }
-    }
-
-    @Test
-    public void testReplicateGetLessNodesThanReplicatorThreads() throws Throwable {
-        testReplicateXXX(executorThreadCount - 1, HttpMethod.GET);
-    }
-
-    @Test
-    public void testReplicateGetMoreNodesThanReplicatorThreads() throws Throwable {
-        testReplicateXXX(executorThreadCount + 1, HttpMethod.GET);
-    }
-
-    @Test
-    public void testReplicateGetWithUnresponsiveNode() throws Throwable {
-
-        // nodes
-        Set<NodeIdentifier> nodeIds = createNodes(2, "localhost", serverPort);
-
-        // response
-        HttpResponse expectedResponse = new HttpResponse(Status.OK, expectedBody);
-
-        // first response normal, second response slow
-        server.addResponseAction(new HttpResponseAction(expectedResponse));
-        server.addResponseAction(new HttpResponseAction(expectedResponse, 3500));
-
-        Set<NodeResponse> responses = replicator.replicate(
-                nodeIds,
-                HttpMethod.GET,
-                prototypeUri,
-                expectedRequestParameters,
-                expectedRequestHeaders);
-
-        assertEquals(nodeIds.size(), responses.size());
-
-        Iterator<NodeResponse> nodeResponseItr = responses.iterator();
-
-        NodeResponse firstResponse = nodeResponseItr.next();
-        NodeResponse secondResponse = nodeResponseItr.next();
-        NodeResponse goodResponse;
-        NodeResponse badResponse;
-        if (firstResponse.hasThrowable()) {
-            goodResponse = secondResponse;
-            badResponse = firstResponse;
-        } else {
-            goodResponse = firstResponse;
-            badResponse = secondResponse;
-        }
-
-        // good response
-        // check status
-        assertEquals(Status.OK.getStatusCode(), goodResponse.getStatus());
-
-        // check entity stream
-        final ByteArrayOutputStream baos = new ByteArrayOutputStream();
-        ((StreamingOutput) goodResponse.getResponse().getEntity()).write(baos);
-        assertEquals("some text", new String(baos.toByteArray()));
-
-        // bad response
-        assertTrue(badResponse.hasThrowable());
-        assertEquals(Status.INTERNAL_SERVER_ERROR.getStatusCode(), badResponse.getStatus());
-
-    }
-
-    @Test(expected = IllegalArgumentException.class)
-    public void testReplicateGetWithEntity() throws Throwable {
-        testReplicateXXXEntity(HttpMethod.GET);
-    }
-
-    @Test
-    public void testReplicatePost() throws Throwable {
-        testReplicateXXX(HttpMethod.POST);
-    }
-
-    @Test
-    public void testReplicatePostWithEntity() throws Throwable {
-        testReplicateXXXEntity(HttpMethod.POST);
-    }
-
-    @Test
-    public void testReplicatePut() throws Throwable {
-        testReplicateXXX(HttpMethod.PUT);
-    }
-
-    @Test
-    public void testReplicatePutWithEntity() throws Throwable {
-        testReplicateXXXEntity(HttpMethod.PUT);
-    }
-
-    @Test
-    public void testReplicateDelete() throws Throwable {
-        testReplicateXXX(HttpMethod.DELETE);
-    }
-
-    @Test(expected = IllegalArgumentException.class)
-    public void testReplicateDeleteWithEntity() throws Throwable {
-        testReplicateXXXEntity(HttpMethod.DELETE);
-    }
-
-    @Test
-    public void testReplicateHead() throws Throwable {
-        testReplicateXXX(HttpMethod.HEAD);
-    }
-
-    @Test(expected = IllegalArgumentException.class)
-    public void testReplicateHeadWithEntity() throws Throwable {
-        testReplicateXXXEntity(HttpMethod.HEAD);
-    }
-
-    @Test
-    public void testReplicateOptions() throws Throwable {
-        testReplicateXXX(HttpMethod.OPTIONS);
-    }
-
-    @Test(expected = IllegalArgumentException.class)
-    public void testReplicateOptionsWithEntity() throws Throwable {
-        testReplicateXXXEntity(HttpMethod.OPTIONS);
-    }
-
-    private void testReplicateXXX(final String method) throws Throwable {
-        testReplicateXXX(executorThreadCount, method);
-    }
-
-    private void testReplicateXXX(final int numNodes, final String method) throws Throwable {
-
-        // nodes
-        Set<NodeIdentifier> nodeIds = createNodes(numNodes, "localhost", serverPort);
-
-        // set up responses
-        for (int i = 0; i < nodeIds.size(); i++) {
-            HttpResponse response = new HttpResponse(Status.OK, expectedBody);
-            response.addHeaders(expectedResponseHeaders);
-            server.addResponseAction(new HttpResponseAction(response));
-        }
-
-        // setup request parameters
-        server.addCheckedParameters(expectedRequestParameters);
-
-        // request headers
-        server.addCheckedHeaders(expectedRequestHeaders);
-
-        Set<NodeResponse> responses = replicator.replicate(
-                nodeIds,
-                method,
-                prototypeUri,
-                expectedRequestParameters,
-                expectedRequestHeaders);
-
-        Set<NodeIdentifier> returnedNodeIds = new HashSet<>();
-        for (NodeResponse response : responses) {
-
-            // check if we received an exception
-            if (response.hasThrowable()) {
-                throw response.getThrowable();
-            }
-
-            // gather ids to verify later
-            returnedNodeIds.add(response.getNodeId());
-
-            // check status
-            assertEquals(Status.OK.getStatusCode(), response.getStatus());
-
-            Response serverResponse = response.getResponse();
-
-            // check response headers are copied
-            assertTrue(containsHeaders(expectedResponseHeaders, serverResponse.getMetadata()));
-
-            // check entity stream
-            if (HttpMethod.HEAD.equalsIgnoreCase(method)) {
-                assertNull(serverResponse.getEntity());
-            } else {
-                assertTrue(isEquals((StreamingOutput) serverResponse.getEntity(), expectedBody));
-            }
-
-        }
-
-        // check node Ids
-        assertEquals(nodeIds, returnedNodeIds);
-    }
-
-    private void testReplicateXXXEntity(final String method) throws Throwable {
-        testReplicateXXXEntity(executorThreadCount, method);
-    }
-
-    private void testReplicateXXXEntity(final int numNodes, final String method) throws Throwable {
-
-        // nodes
-        Set<NodeIdentifier> nodeIds = createNodes(numNodes, "localhost", serverPort);
-
-        // set up responses
-        for (int i = 0; i < nodeIds.size(); i++) {
-            HttpResponse response = new HttpResponse(Status.OK, expectedBody);
-            response.addHeaders(expectedResponseHeaders);
-            server.addResponseAction(new HttpResponseAction(response));
-        }
-
-        // headers
-        expectedRequestHeaders.put("Content-Type", "application/xml");
-
-        Set<NodeResponse> responses = replicator.replicate(
-                nodeIds,
-                method,
-                prototypeUri,
-                expectedEntity,
-                expectedRequestHeaders);
-
-        Set<NodeIdentifier> returnedNodeIds = new HashSet<>();
-        for (NodeResponse response : responses) {
-
-            // check if we received an exception
-            if (response.hasThrowable()) {
-                throw response.getThrowable();
-            }
-
-            // gather ids to verify later
-            returnedNodeIds.add(response.getNodeId());
-
-            // check status
-            assertEquals(Status.OK.getStatusCode(), response.getStatus());
-
-            Response serverResponse = response.getResponse();
-
-            // check response headers are copied
-            assertTrue(containsHeaders(expectedResponseHeaders, serverResponse.getMetadata()));
-
-            // check entity stream
-            assertTrue(isEquals((StreamingOutput) serverResponse.getEntity(), expectedBody));
-
-        }
-
-        // check node Ids
-        assertEquals(nodeIds, returnedNodeIds);
-    }
-
-    private Set<NodeIdentifier> createNodes(int num, String host, int apiPort) {
-        Set<NodeIdentifier> result = new HashSet<>();
-        for (int i = 0; i < num; i++) {
-            result.add(new NodeIdentifier(String.valueOf(i), host, apiPort, host, 1));
-        }
-        return result;
-    }
-
-    private boolean isEquals(StreamingOutput so, String expectedText) throws IOException {
-        final ByteArrayOutputStream baos = new ByteArrayOutputStream();
-        so.write(baos);
-        return expectedText.equals(new String(baos.toByteArray()));
-    }
-
-    private boolean containsHeaders(Map<String, String> expectedHeaders, MultivaluedMap<String, Object> metadata) {
-        for (Map.Entry<String, String> expectedEntry : expectedHeaders.entrySet()) {
-            if (expectedEntry.getValue().equals(metadata.getFirst(expectedEntry.getKey())) == false) {
-                return false;
-            }
-        }
-        return true;
-    }
-
-}
-
-@XmlRootElement
-class Entity {
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/manager/impl/HttpResponseMapperImplTest.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/manager/impl/HttpResponseMapperImplTest.java b/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/manager/impl/HttpResponseMapperImplTest.java
deleted file mode 100644
index d45a4d1..0000000
--- a/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/manager/impl/HttpResponseMapperImplTest.java
+++ /dev/null
@@ -1,126 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.cluster.manager.impl;
-
-import org.apache.nifi.cluster.manager.impl.HttpResponseMapperImpl;
-import com.sun.jersey.api.client.ClientResponse;
-import com.sun.jersey.core.util.MultivaluedMapImpl;
-import java.io.ByteArrayInputStream;
-import java.util.Map;
-import java.util.HashSet;
-import java.util.Set;
-import java.net.URI;
-import java.net.URISyntaxException;
-import org.apache.nifi.cluster.manager.NodeResponse;
-import org.apache.nifi.cluster.node.Node;
-import org.apache.nifi.cluster.node.Node.Status;
-import org.apache.nifi.cluster.protocol.NodeIdentifier;
-import org.junit.Before;
-import org.junit.Test;
-import static org.junit.Assert.*;
-import static org.mockito.Mockito.*;
-
-/**
- * @author unattributed
- */
-public class HttpResponseMapperImplTest {
-
-    private HttpResponseMapperImpl mapper;
-
-    private URI dummyUri;
-
-    @Before
-    public void setup() throws URISyntaxException {
-        mapper = new HttpResponseMapperImpl();
-        dummyUri = new URI("http://dummy.com");
-    }
-
-    @Test
-    public void testToNodeStatusWithNo2xxResponses() {
-
-        Set<NodeResponse> nodeResponses = new HashSet<>();
-        nodeResponses.add(createNodeResourceResponse("1", 400));
-        nodeResponses.add(createNodeResourceResponse("2", 100));
-        nodeResponses.add(createNodeResourceResponse("3", 300));
-        nodeResponses.add(createNodeResourceResponse("4", 500));
-
-        Map<NodeResponse, Status> map = mapper.map(dummyUri, nodeResponses);
-
-        // since no 2xx responses, any 5xx is disconnected
-        for (Map.Entry<NodeResponse, Status> entry : map.entrySet()) {
-            NodeResponse response = entry.getKey();
-            Status status = entry.getValue();
-            switch (response.getNodeId().getId()) {
-                case "1":
-                    assertTrue(status == Node.Status.CONNECTED);
-                    break;
-                case "2":
-                    assertTrue(status == Node.Status.CONNECTED);
-                    break;
-                case "3":
-                    assertTrue(status == Node.Status.CONNECTED);
-                    break;
-                case "4":
-                    assertTrue(status == Node.Status.DISCONNECTED);
-                    break;
-            }
-        }
-    }
-
-    @Test
-    public void testToNodeStatusWith2xxResponses() {
-
-        Set<NodeResponse> nodeResponses = new HashSet<>();
-        nodeResponses.add(createNodeResourceResponse("1", 200));
-        nodeResponses.add(createNodeResourceResponse("2", 100));
-        nodeResponses.add(createNodeResourceResponse("3", 300));
-        nodeResponses.add(createNodeResourceResponse("4", 500));
-
-        Map<NodeResponse, Status> map = mapper.map(dummyUri, nodeResponses);
-
-        // since there were 2xx responses, any non-2xx is disconnected
-        for (Map.Entry<NodeResponse, Status> entry : map.entrySet()) {
-            NodeResponse response = entry.getKey();
-            Status status = entry.getValue();
-            switch (response.getNodeId().getId()) {
-                case "1":
-                    assertTrue(status == Node.Status.CONNECTED);
-                    break;
-                case "2":
-                    assertTrue(status == Node.Status.DISCONNECTED);
-                    break;
-                case "3":
-                    assertTrue(status == Node.Status.DISCONNECTED);
-                    break;
-                case "4":
-                    assertTrue(status == Node.Status.DISCONNECTED);
-                    break;
-            }
-        }
-    }
-
-    private NodeResponse createNodeResourceResponse(String nodeId, int statusCode) {
-
-        ClientResponse clientResponse = mock(ClientResponse.class);
-        when(clientResponse.getStatus()).thenReturn(statusCode);
-        when(clientResponse.getHeaders()).thenReturn(new MultivaluedMapImpl());
-        when(clientResponse.getEntityInputStream()).thenReturn(new ByteArrayInputStream(new byte[0]));
-
-        NodeIdentifier nodeIdentifier = new NodeIdentifier(nodeId, "localhost", 1, "localhost", 1);
-        return new NodeResponse(nodeIdentifier, "GET", dummyUri, clientResponse, 1L, "111");
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/manager/impl/TestWebClusterManager.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/manager/impl/TestWebClusterManager.java b/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/manager/impl/TestWebClusterManager.java
deleted file mode 100644
index 13a192f..0000000
--- a/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/manager/impl/TestWebClusterManager.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.cluster.manager.impl;
-
-import org.apache.nifi.cluster.manager.impl.WebClusterManager;
-
-import static org.junit.Assert.assertEquals;
-
-import java.text.DateFormat;
-import java.text.ParseException;
-import java.text.SimpleDateFormat;
-import java.util.Date;
-import java.util.Locale;
-
-import org.junit.Test;
-
-public class TestWebClusterManager {
-
-    @Test
-    public void testNormalizedStatusSnapshotDate() throws ParseException {
-        final DateFormat df = new SimpleDateFormat("yyyy/MM/dd HH:mm:SS.SSS", Locale.US);
-        final Date date1 = df.parse("2014/01/01 00:00:00.000");
-        final Date date2 = df.parse("2014/01/01 00:04:59.999");
-        final Date date3 = df.parse("2014/01/01 00:05:00.000");
-        final Date date4 = df.parse("2014/01/01 00:05:00.001");
-
-        final Date normalized1 = WebClusterManager.normalizeStatusSnapshotDate(date1, 300000);
-        assertEquals(date1, normalized1);
-
-        final Date normalized2 = WebClusterManager.normalizeStatusSnapshotDate(date2, 300000);
-        assertEquals(date1, normalized2);
-
-        final Date normalized3 = WebClusterManager.normalizeStatusSnapshotDate(date3, 300000);
-        assertEquals(date3, normalized3);
-
-        final Date normalized4 = WebClusterManager.normalizeStatusSnapshotDate(date4, 300000);
-        assertEquals(date3, normalized4);
-    }
-
-}