You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by mc...@apache.org on 2015/01/21 14:04:26 UTC
[14/64] [partial] incubator-nifi git commit: NIFI-270 Made all
changes identified by adam, mark, joey to prep for a cleaner build
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/node/Node.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/node/Node.java b/nifi/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/node/Node.java
deleted file mode 100644
index 84565da..0000000
--- a/nifi/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/node/Node.java
+++ /dev/null
@@ -1,252 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.cluster.node;
-
-import java.util.concurrent.atomic.AtomicLong;
-
-import org.apache.nifi.cluster.HeartbeatPayload;
-import org.apache.nifi.cluster.protocol.Heartbeat;
-import org.apache.nifi.cluster.protocol.NodeIdentifier;
-import org.apache.nifi.cluster.protocol.ProtocolException;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Represents a connected flow controller. Nodes always have an immutable
- * identifier and a status. The status may be changed, but never null.
- *
- * A Node may be cloned, but the cloning is a shallow copy of the instance.
- *
- * This class overrides hashCode and equals and considers two instances to be
- * equal if they have the equal NodeIdentifiers.
- *
- * @author unattributed
- */
-public class Node implements Cloneable, Comparable<Node> {
-
- private static final Logger lockLogger = LoggerFactory.getLogger("cluster.lock");
-
- /**
- * The semantics of a Node status are as follows:
- * <ul>
- * <li>CONNECTED -- a flow controller that is connected to the cluster. A
- * connecting node transitions to connected after the cluster receives the
- * flow controller's first heartbeat. A connected node can transition to
- * disconnecting.</li>
- * <li>CONNECTING -- a flow controller has issued a connection request to
- * the cluster, but has not yet sent a heartbeat. A connecting node can
- * transition to disconnecting or connected. The cluster will not accept any
- * external requests to change the flow while any node is connecting.</li>
- * <li>DISCONNECTED -- a flow controller that is not connected to the
- * cluster. A disconnected node can transition to connecting.</li>
- * <li>DISCONNECTING -- a flow controller that is in the process of
- * disconnecting from the cluster. A disconnecting node will always
- * transition to disconnected.</li>
- * </ul>
- */
- public static enum Status {
-
- CONNECTED,
- CONNECTING,
- DISCONNECTED,
- DISCONNECTING
- }
-
- /**
- * the node's unique identifier
- */
- private final NodeIdentifier nodeId;
-
- /**
- * the node statue
- */
- private Status status;
-
- /**
- * the last heartbeat received by from the node
- */
- private Heartbeat lastHeartbeat;
-
- /**
- * the payload of the last heartbeat received from the node
- */
- private HeartbeatPayload lastHeartbeatPayload;
-
- /**
- * the last time the connection for this node was requested
- */
- private AtomicLong connectionRequestedTimestamp = new AtomicLong(0L);
-
- /**
- * a flag to indicate this node was disconnected because of a lack of
- * heartbeat
- */
- private boolean heartbeatDisconnection;
-
- public Node(final NodeIdentifier id, final Status status) {
- if (id == null) {
- throw new IllegalArgumentException("ID may not be null.");
- } else if (status == null) {
- throw new IllegalArgumentException("Status may not be null.");
- }
- this.nodeId = id;
- this.status = status;
- }
-
- public NodeIdentifier getNodeId() {
- return nodeId;
- }
-
- /**
- * Returns the last received heartbeat or null if no heartbeat has been set.
- *
- * @return a heartbeat or null
- */
- public Heartbeat getHeartbeat() {
- return lastHeartbeat;
- }
-
- public HeartbeatPayload getHeartbeatPayload() {
- return lastHeartbeatPayload;
- }
-
- /**
- * Sets the last heartbeat received.
- *
- * @param heartbeat a heartbeat
- *
- * @throws ProtocolException if the heartbeat's payload failed unmarshalling
- */
- public void setHeartbeat(final Heartbeat heartbeat) throws ProtocolException {
- this.lastHeartbeat = heartbeat;
- if (this.lastHeartbeat == null) {
- this.lastHeartbeatPayload = null;
- } else {
- final byte[] payload = lastHeartbeat.getPayload();
- if (payload == null || payload.length == 0) {
- this.lastHeartbeatPayload = null;
- } else {
- this.lastHeartbeatPayload = HeartbeatPayload.unmarshal(payload);
- }
- }
- }
-
- /**
- * Returns the time of the last received connection request for this node.
- *
- * @return the time when the connection request for this node was received.
- */
- public long getConnectionRequestedTimestamp() {
- return connectionRequestedTimestamp.get();
- }
-
- /**
- * Sets the time when the connection request for this node was last
- * received.
- *
- * This method is thread-safe and may be called without obtaining any lock.
- *
- * @param connectionRequestedTimestamp
- */
- public void setConnectionRequestedTimestamp(long connectionRequestedTimestamp) {
- this.connectionRequestedTimestamp.set(connectionRequestedTimestamp);
- }
-
- /**
- * Returns true if the node was disconnected due to lack of heartbeat; false
- * otherwise.
- *
- * @return true if the node was disconnected due to lack of heartbeat; false
- * otherwise.
- */
- public boolean isHeartbeatDisconnection() {
- return heartbeatDisconnection;
- }
-
- /**
- * Sets the status to disconnected and flags the node as being disconnected
- * by lack of heartbeat.
- */
- public void setHeartbeatDisconnection() {
- setStatus(Status.DISCONNECTED);
- heartbeatDisconnection = true;
- }
-
- /**
- * @return the status
- */
- public Status getStatus() {
- return status;
- }
-
- /**
- * @param status a status
- */
- public void setStatus(final Status status) {
- if (status == null) {
- throw new IllegalArgumentException("Status may not be null.");
- }
- this.status = status;
- heartbeatDisconnection = false;
- }
-
- @Override
- public Node clone() {
- final Node clone = new Node(nodeId, status);
- clone.lastHeartbeat = lastHeartbeat;
- clone.lastHeartbeatPayload = lastHeartbeatPayload;
- clone.heartbeatDisconnection = heartbeatDisconnection;
- clone.connectionRequestedTimestamp = connectionRequestedTimestamp;
- return clone;
- }
-
- @Override
- public boolean equals(Object obj) {
- if (obj == null) {
- return false;
- }
- if (getClass() != obj.getClass()) {
- return false;
- }
- final Node other = (Node) obj;
- if (this.nodeId != other.nodeId && (this.nodeId == null || !this.nodeId.equals(other.nodeId))) {
- return false;
- }
- return true;
- }
-
- @Override
- public int hashCode() {
- int hash = 7;
- hash = 53 * hash + (this.nodeId != null ? this.nodeId.hashCode() : 0);
- return hash;
- }
-
- @Override
- public String toString() {
- return nodeId.toString();
- }
-
- @Override
- public int compareTo(final Node o) {
- if (o == null) {
- return -1;
- }
- return getNodeId().getId().compareTo(o.getNodeId().getId());
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/spring/ClusterManagerProtocolServiceLocatorFactoryBean.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/spring/ClusterManagerProtocolServiceLocatorFactoryBean.java b/nifi/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/spring/ClusterManagerProtocolServiceLocatorFactoryBean.java
deleted file mode 100644
index e26d196..0000000
--- a/nifi/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/spring/ClusterManagerProtocolServiceLocatorFactoryBean.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.cluster.spring;
-
-import java.net.InetSocketAddress;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.nifi.cluster.protocol.impl.ClusterServiceDiscovery;
-import org.apache.nifi.cluster.protocol.impl.ClusterServiceLocator;
-import org.apache.nifi.io.socket.multicast.DiscoverableService;
-import org.apache.nifi.io.socket.multicast.DiscoverableServiceImpl;
-import org.apache.nifi.util.FormatUtils;
-import org.apache.nifi.util.NiFiProperties;
-
-import org.springframework.beans.BeansException;
-import org.springframework.beans.factory.DisposableBean;
-import org.springframework.beans.factory.FactoryBean;
-import org.springframework.context.ApplicationContext;
-import org.springframework.context.ApplicationContextAware;
-
-/**
- * Factory bean for creating a singleton ClusterManagerProtocolServiceLocator
- * instance. If the application is configured to act as the cluster manager,
- * then null is always returned as the created instance.
- *
- * The cluster manager protocol service represents the socket endpoint for
- * sending internal socket messages to the cluster manager.
- */
-public class ClusterManagerProtocolServiceLocatorFactoryBean implements FactoryBean, ApplicationContextAware, DisposableBean {
-
- private ApplicationContext applicationContext;
-
- private ClusterServiceLocator locator;
-
- private NiFiProperties properties;
-
- @Override
- public Object getObject() throws Exception {
- /*
- * If configured for the cluster manager, then the service locator is never used.
- */
- if (properties.isClusterManager()) {
- return null;
- } else if (locator == null) {
-
- if (properties.getClusterProtocolUseMulticast()) {
-
- // get the service discovery instance
- final ClusterServiceDiscovery serviceDiscovery = applicationContext.getBean("clusterManagerProtocolServiceDiscovery", ClusterServiceDiscovery.class);
-
- // create service location configuration
- final ClusterServiceLocator.AttemptsConfig config = new ClusterServiceLocator.AttemptsConfig();
- config.setNumAttempts(properties.getClusterProtocolMulticastServiceLocatorAttempts());
-
- final int delay = (int) FormatUtils.getTimeDuration(properties.getClusterProtocolMulticastServiceLocatorAttemptsDelay(), TimeUnit.SECONDS);
- config.setTimeBetweenAttempts(delay);
- config.setTimeBetweenAttempsUnit(TimeUnit.SECONDS);
-
- locator = new ClusterServiceLocator(serviceDiscovery);
- locator.setAttemptsConfig(config);
-
- } else {
- final String serviceName = applicationContext.getBean("clusterManagerProtocolServiceName", String.class);
- final InetSocketAddress serviceAddress = properties.getClusterNodeUnicastManagerProtocolAddress();
- final DiscoverableService service = new DiscoverableServiceImpl(serviceName, serviceAddress);
- locator = new ClusterServiceLocator(service);
- }
-
- // start the locator
- locator.start();
-
- }
- return locator;
-
- }
-
- @Override
- public Class getObjectType() {
- return ClusterServiceLocator.class;
- }
-
- @Override
- public boolean isSingleton() {
- return true;
- }
-
- @Override
- public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
- this.applicationContext = applicationContext;
- }
-
- @Override
- public void destroy() throws Exception {
- if (locator != null && locator.isRunning()) {
- locator.stop();
- }
- }
-
- public void setProperties(NiFiProperties properties) {
- this.properties = properties;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/spring/FileBasedClusterNodeFirewallFactoryBean.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/spring/FileBasedClusterNodeFirewallFactoryBean.java b/nifi/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/spring/FileBasedClusterNodeFirewallFactoryBean.java
deleted file mode 100644
index ef72298..0000000
--- a/nifi/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/spring/FileBasedClusterNodeFirewallFactoryBean.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.cluster.spring;
-
-import java.io.File;
-import org.apache.nifi.cluster.firewall.impl.FileBasedClusterNodeFirewall;
-import org.apache.nifi.util.NiFiProperties;
-import org.springframework.beans.factory.FactoryBean;
-
-/**
- * Factory bean for creating a singleton FileBasedClusterNodeFirewall instance.
- */
-public class FileBasedClusterNodeFirewallFactoryBean implements FactoryBean {
-
- private FileBasedClusterNodeFirewall firewall;
-
- private NiFiProperties properties;
-
- @Override
- public Object getObject() throws Exception {
- if (firewall == null) {
- final File config = properties.getClusterManagerNodeFirewallFile();
- final File restoreDirectory = properties.getRestoreDirectory();
- if (config != null) {
- firewall = new FileBasedClusterNodeFirewall(config, restoreDirectory);
- }
- }
- return firewall;
- }
-
- @Override
- public Class getObjectType() {
- return FileBasedClusterNodeFirewall.class;
- }
-
- @Override
- public boolean isSingleton() {
- return true;
- }
-
- public void setProperties(NiFiProperties properties) {
- this.properties = properties;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/spring/WebClusterManagerFactoryBean.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/spring/WebClusterManagerFactoryBean.java b/nifi/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/spring/WebClusterManagerFactoryBean.java
deleted file mode 100644
index 7169730..0000000
--- a/nifi/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/spring/WebClusterManagerFactoryBean.java
+++ /dev/null
@@ -1,139 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.cluster.spring;
-
-import java.nio.file.Paths;
-import org.apache.nifi.admin.service.AuditService;
-import org.apache.nifi.cluster.event.EventManager;
-import org.apache.nifi.cluster.firewall.ClusterNodeFirewall;
-import org.apache.nifi.cluster.flow.DataFlowManagementService;
-import org.apache.nifi.cluster.manager.HttpRequestReplicator;
-import org.apache.nifi.cluster.manager.HttpResponseMapper;
-import org.apache.nifi.cluster.manager.impl.WebClusterManager;
-import org.apache.nifi.cluster.protocol.impl.ClusterManagerProtocolSenderListener;
-import org.apache.nifi.cluster.protocol.impl.ClusterServicesBroadcaster;
-import org.apache.nifi.controller.service.ControllerServiceLoader;
-import org.apache.nifi.encrypt.StringEncryptor;
-import org.apache.nifi.io.socket.multicast.DiscoverableService;
-import org.apache.nifi.io.socket.multicast.DiscoverableServiceImpl;
-import org.apache.nifi.util.NiFiProperties;
-import org.springframework.beans.BeansException;
-import org.springframework.beans.factory.FactoryBean;
-import org.springframework.context.ApplicationContext;
-import org.springframework.context.ApplicationContextAware;
-
-/**
- * Factory bean for creating a singleton WebClusterManager instance. If the
- * application is not configured to act as the cluster manager, then null is
- * always returned as the created instance.
- */
-public class WebClusterManagerFactoryBean implements FactoryBean, ApplicationContextAware {
-
- private ApplicationContext applicationContext;
-
- private WebClusterManager clusterManager;
-
- private NiFiProperties properties;
-
- private StringEncryptor encryptor;
-
- @Override
- public Object getObject() throws Exception {
- if (properties.isClusterManager() && properties.isNode()) {
- throw new IllegalStateException("Application may be configured as a cluster manager or a node, but not both.");
- } else if (!properties.isClusterManager()) {
- /*
- * If not configured for the cluster manager, then the cluster manager is never used.
- * null is returned so that we don't instantiate a thread pool or other resources.
- */
- return null;
- } else if (clusterManager == null) {
-
- // get the service configuration path (fail early)
- final String serviceConfigurationFile = properties.getProperty(NiFiProperties.SERVICE_CONFIGURATION_FILE);
- if (serviceConfigurationFile == null) {
- throw new NullPointerException("The service configuration file has not been specified.");
- }
-
- final HttpRequestReplicator requestReplicator = applicationContext.getBean("httpRequestReplicator", HttpRequestReplicator.class);
- final HttpResponseMapper responseMapper = applicationContext.getBean("httpResponseMapper", HttpResponseMapper.class);
- final DataFlowManagementService dataFlowService = applicationContext.getBean("dataFlowManagementService", DataFlowManagementService.class);
- final ClusterManagerProtocolSenderListener senderListener = applicationContext.getBean("clusterManagerProtocolSenderListener", ClusterManagerProtocolSenderListener.class);
-
- // create the manager
- clusterManager = new WebClusterManager(
- requestReplicator,
- responseMapper,
- dataFlowService,
- senderListener,
- properties,
- encryptor
- );
-
- // set the service broadcaster
- if (properties.getClusterProtocolUseMulticast()) {
-
- // create broadcaster
- final ClusterServicesBroadcaster broadcaster = applicationContext.getBean("clusterServicesBroadcaster", ClusterServicesBroadcaster.class);
-
- // register the cluster manager protocol service
- final String clusterManagerProtocolServiceName = applicationContext.getBean("clusterManagerProtocolServiceName", String.class);
- final DiscoverableService clusterManagerProtocolService = new DiscoverableServiceImpl(clusterManagerProtocolServiceName, properties.getClusterManagerProtocolAddress());
- broadcaster.addService(clusterManagerProtocolService);
-
- clusterManager.setServicesBroadcaster(broadcaster);
- }
-
- // set the event manager
- clusterManager.setEventManager(applicationContext.getBean("nodeEventHistoryManager", EventManager.class));
-
- // set the cluster firewall
- clusterManager.setClusterFirewall(applicationContext.getBean("clusterFirewall", ClusterNodeFirewall.class));
-
- // set the audit service
- clusterManager.setAuditService(applicationContext.getBean("auditService", AuditService.class));
-
- // load the controller services
- final ControllerServiceLoader serviceLoader = new ControllerServiceLoader(Paths.get(serviceConfigurationFile));
- serviceLoader.loadControllerServices(clusterManager);
- }
- return clusterManager;
- }
-
- @Override
- public Class getObjectType() {
- return WebClusterManager.class;
- }
-
- @Override
- public boolean isSingleton() {
- return true;
- }
-
- @Override
- public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
- this.applicationContext = applicationContext;
- }
-
- public void setProperties(NiFiProperties properties) {
- this.properties = properties;
- }
-
- public void setEncryptor(final StringEncryptor encryptor) {
- this.encryptor = encryptor;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/controller/reporting/ClusteredReportingTaskNode.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/controller/reporting/ClusteredReportingTaskNode.java b/nifi/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/controller/reporting/ClusteredReportingTaskNode.java
deleted file mode 100644
index 1ed5b30..0000000
--- a/nifi/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/controller/reporting/ClusteredReportingTaskNode.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.controller.reporting;
-
-import org.apache.nifi.cluster.manager.impl.ClusteredReportingContext;
-import org.apache.nifi.controller.ProcessScheduler;
-import org.apache.nifi.controller.ValidationContextFactory;
-import org.apache.nifi.controller.service.ControllerServiceProvider;
-import org.apache.nifi.reporting.BulletinRepository;
-import org.apache.nifi.reporting.EventAccess;
-import org.apache.nifi.reporting.ReportingContext;
-import org.apache.nifi.reporting.ReportingTask;
-
-public class ClusteredReportingTaskNode extends AbstractReportingTaskNode {
-
- private final EventAccess eventAccess;
- private final BulletinRepository bulletinRepository;
- private final ControllerServiceProvider serviceProvider;
-
- public ClusteredReportingTaskNode(final ReportingTask reportingTask, final String id, final ProcessScheduler scheduler,
- final EventAccess eventAccess, final BulletinRepository bulletinRepository, final ControllerServiceProvider serviceProvider,
- final ValidationContextFactory validationContextFactory) {
- super(reportingTask, id, serviceProvider, scheduler, validationContextFactory);
-
- this.eventAccess = eventAccess;
- this.bulletinRepository = bulletinRepository;
- this.serviceProvider = serviceProvider;
- }
-
- @Override
- public ReportingContext getReportingContext() {
- return new ClusteredReportingContext(eventAccess, bulletinRepository, getProperties(), serviceProvider);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/nar-bundles/framework-bundle/framework/cluster/src/main/resources/nifi-cluster-manager-context.xml
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/cluster/src/main/resources/nifi-cluster-manager-context.xml b/nifi/nar-bundles/framework-bundle/framework/cluster/src/main/resources/nifi-cluster-manager-context.xml
deleted file mode 100644
index 68c29bc..0000000
--- a/nifi/nar-bundles/framework-bundle/framework/cluster/src/main/resources/nifi-cluster-manager-context.xml
+++ /dev/null
@@ -1,124 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements. See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License. You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
--->
-<!-- marked as lazy so that clustering beans are not created when applications runs in non-clustered mode -->
-<beans default-lazy-init="true"
- xmlns="http://www.springframework.org/schema/beans"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xmlns:util="http://www.springframework.org/schema/util"
- xmlns:context="http://www.springframework.org/schema/context"
- xmlns:aop="http://www.springframework.org/schema/aop"
- xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.1.xsd
- http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-3.1.xsd
- http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.1.xsd
- http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.1.xsd">
-
- <!-- jersey client -->
- <bean id="jersey-client" class="org.apache.nifi.web.util.WebUtils" factory-method="createClient">
- <constructor-arg>
- <bean class="com.sun.jersey.api.client.config.DefaultClientConfig"/>
- </constructor-arg>
- <constructor-arg>
- <bean class="org.apache.nifi.framework.security.util.SslContextFactory" factory-method="createSslContext">
- <constructor-arg ref="nifiProperties"/>
- </bean>
- </constructor-arg>
- </bean>
-
- <!-- http request replicator -->
- <bean id="httpRequestReplicator" class="org.apache.nifi.cluster.manager.impl.HttpRequestReplicatorImpl">
- <constructor-arg index="0">
- <bean factory-bean="nifiProperties" factory-method="getClusterManagerNodeApiRequestThreads"/>
- </constructor-arg>
- <constructor-arg ref="jersey-client" index="1"/>
- <constructor-arg index="2">
- <bean factory-bean="nifiProperties" factory-method="getClusterManagerNodeApiConnectionTimeout"/>
- </constructor-arg>
- <constructor-arg index="3">
- <bean factory-bean="nifiProperties" factory-method="getClusterManagerNodeApiReadTimeout"/>
- </constructor-arg>
- <property name="nodeProtocolScheme">
- <bean factory-bean="nifiProperties" factory-method="getClusterProtocolManagerToNodeApiScheme"/>
- </property>
- </bean>
-
- <!-- http response mapper -->
- <bean id="httpResponseMapper" class="org.apache.nifi.cluster.manager.impl.HttpResponseMapperImpl"/>
-
- <!-- cluster flow DAO -->
- <bean id="dataFlowDao" class="org.apache.nifi.cluster.flow.impl.DataFlowDaoImpl">
- <constructor-arg index="0">
- <bean factory-bean="nifiProperties" factory-method="getFlowConfigurationFileDir"/>
- </constructor-arg>
- <constructor-arg index="1">
- <bean factory-bean="nifiProperties" factory-method="getRestoreDirectory"/>
- </constructor-arg>
- <constructor-arg index="2">
- <bean factory-bean="nifiProperties" factory-method="getAutoResumeState"/>
- </constructor-arg>
- </bean>
-
- <!-- dataflow management service -->
- <bean id="dataFlowManagementService" class="org.apache.nifi.cluster.flow.impl.DataFlowManagementServiceImpl">
- <constructor-arg ref="dataFlowDao"/>
- <constructor-arg ref="clusterManagerProtocolSender"/>
- <property name="retrievalDelay">
- <bean factory-bean="nifiProperties" factory-method="getClusterManagerFlowRetrievalDelay"/>
- </property>
- </bean>
-
- <!-- node event history manager -->
- <bean id="nodeEventHistoryManager" class="org.apache.nifi.cluster.event.impl.EventManagerImpl">
- <constructor-arg>
- <bean factory-bean="nifiProperties" factory-method="getClusterManagerNodeEventHistorySize"/>
- </constructor-arg>
- </bean>
-
- <!-- cluster firewall -->
- <bean id="clusterFirewall" class="org.apache.nifi.cluster.spring.FileBasedClusterNodeFirewallFactoryBean">
- <property name="properties" ref="nifiProperties"/>
- </bean>
-
- <!-- cluster manager -->
- <bean id="clusterManager" class="org.apache.nifi.cluster.spring.WebClusterManagerFactoryBean">
- <property name="properties" ref="nifiProperties"/>
- <property name="encryptor" ref="stringEncryptor"/>
- </bean>
-
- <!-- discoverable services -->
-
- <!-- cluster manager protocol discoverable service -->
-
- <!-- service name for communicating with the cluster manager using sockets -->
- <bean id="clusterManagerProtocolServiceName" class="java.lang.String">
- <constructor-arg value="cluster-manager-protocol" />
- </bean>
-
- <!-- cluster manager protocol service discovery -->
- <bean id="clusterManagerProtocolServiceDiscovery" class="org.apache.nifi.cluster.protocol.impl.ClusterServiceDiscovery">
- <constructor-arg ref="clusterManagerProtocolServiceName" index="0"/>
- <constructor-arg index="1">
- <bean factory-bean="nifiProperties" factory-method="getClusterProtocolMulticastAddress"/>
- </constructor-arg>
- <constructor-arg ref="protocolMulticastConfiguration" index="2"/>
- <constructor-arg ref="protocolContext" index="3"/>
- </bean>
-
- <!-- cluster manager protocol service locator -->
- <bean id="clusterManagerProtocolServiceLocator" class="org.apache.nifi.cluster.spring.ClusterManagerProtocolServiceLocatorFactoryBean">
- <property name="properties" ref="nifiProperties"/>
- </bean>
-
-</beans>
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/event/impl/EventManagerImplTest.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/event/impl/EventManagerImplTest.java b/nifi/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/event/impl/EventManagerImplTest.java
deleted file mode 100644
index 09ea44b..0000000
--- a/nifi/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/event/impl/EventManagerImplTest.java
+++ /dev/null
@@ -1,119 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.cluster.event.impl;
-
-import org.apache.nifi.cluster.event.impl.EventManagerImpl;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-import org.apache.nifi.cluster.event.Event;
-import org.apache.nifi.cluster.event.Event.Category;
-import org.apache.nifi.cluster.event.EventManager;
-import org.junit.Test;
-import static org.junit.Assert.*;
-
-/**
- * @author unattributed
- */
-public class EventManagerImplTest {
-
- @Test(expected = IllegalArgumentException.class)
- public void testNonPositiveHistorySize() {
- new EventManagerImpl(0);
- }
-
- @Test
- public void testGetEventsUnknownSource() {
- EventManager manager = new EventManagerImpl(1);
- assertEquals(Collections.EMPTY_LIST, manager.getEvents("unknown value"));
- }
-
- @Test
- public void testGetEvents() {
-
- EventManager manager = new EventManagerImpl(2);
-
- Event e1 = new Event("1", "Event1", Category.INFO, 0);
- Event e2 = new Event("1", "Event2", Category.INFO, 1);
-
- manager.addEvent(e1);
- manager.addEvent(e2);
-
- List<Event> events = manager.getEvents("1");
-
- // assert newest to oldest
- assertEquals(Arrays.asList(e2, e1), events);
- }
-
- @Test
- public void testGetMostRecentEventUnknownSource() {
- EventManager manager = new EventManagerImpl(1);
- assertNull(manager.getMostRecentEvent("unknown value"));
- }
-
- @Test
- public void testGetMostRecentEvent() {
-
- EventManager manager = new EventManagerImpl(2);
-
- Event e1 = new Event("1", "Event1", Category.INFO, 0);
- Event e2 = new Event("1", "Event2", Category.INFO, 1);
-
- manager.addEvent(e1);
- manager.addEvent(e2);
-
- // assert newest to oldest
- assertEquals(e2, manager.getMostRecentEvent("1"));
- }
-
- @Test
- public void testAddEventExceedsHistorySize() {
-
- EventManager manager = new EventManagerImpl(1);
-
- Event e1 = new Event("1", "Event1", Category.INFO, 0);
- Event e2 = new Event("1", "Event2", Category.INFO, 1);
-
- manager.addEvent(e1);
- manager.addEvent(e2);
-
- List<Event> events = manager.getEvents("1");
-
- // assert oldest evicted
- assertEquals(Arrays.asList(e2), events);
-
- }
-
- @Test
- public void testClearHistory() {
-
- EventManager manager = new EventManagerImpl(1);
-
- Event e1 = new Event("1", "Event1", Category.INFO, 0);
- Event e2 = new Event("1", "Event2", Category.INFO, 1);
-
- manager.addEvent(e1);
- manager.addEvent(e2);
-
- manager.clearEventHistory("1");
-
- // assert oldest evicted
- assertTrue(manager.getEvents("1").isEmpty());
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/firewall/impl/FileBasedClusterNodeFirewallTest.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/firewall/impl/FileBasedClusterNodeFirewallTest.java b/nifi/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/firewall/impl/FileBasedClusterNodeFirewallTest.java
deleted file mode 100644
index e5db7ca..0000000
--- a/nifi/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/firewall/impl/FileBasedClusterNodeFirewallTest.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.cluster.firewall.impl;
-
-import java.io.File;
-import java.io.IOException;
-import org.apache.nifi.util.file.FileUtils;
-import org.junit.After;
-import org.junit.Before;
-import static org.junit.Assert.*;
-import org.junit.Test;
-
-public class FileBasedClusterNodeFirewallTest {
-
- private FileBasedClusterNodeFirewall ipsFirewall;
-
- private FileBasedClusterNodeFirewall acceptAllFirewall;
-
- private File ipsConfig;
-
- private File emptyConfig;
-
- private File restoreDirectory;
-
- @Before
- public void setup() throws Exception {
-
- ipsConfig = new File("src/test/resources/org/apache/nifi/cluster/firewall/impl/ips.txt");
- emptyConfig = new File("src/test/resources/org/apache/nifi/cluster/firewall/impl/empty.txt");
-
- restoreDirectory = new File(System.getProperty("java.io.tmpdir") + "/firewall_restore");
-
- ipsFirewall = new FileBasedClusterNodeFirewall(ipsConfig, restoreDirectory);
- acceptAllFirewall = new FileBasedClusterNodeFirewall(emptyConfig);
- }
-
- @After
- public void teardown() throws IOException {
- deleteFile(restoreDirectory);
- }
-
- @Test
- public void testSyncWithRestore() {
- assertEquals(ipsConfig.length(), new File(restoreDirectory, ipsConfig.getName()).length());
- }
-
- @Test
- public void testIsPermissibleWithExactMatch() {
- assertTrue(ipsFirewall.isPermissible("2.2.2.2"));
- }
-
- @Test
- public void testIsPermissibleWithSubnetMatch() {
- assertTrue(ipsFirewall.isPermissible("3.3.3.255"));
- }
-
- @Test
- public void testIsPermissibleWithNoMatch() {
- assertFalse(ipsFirewall.isPermissible("255.255.255.255"));
- }
-
- @Test
- public void testIsPermissibleWithMalformedData() {
- assertFalse(ipsFirewall.isPermissible("abc"));
- }
-
- @Test
- public void testIsPermissibleWithEmptyConfig() {
- assertTrue(acceptAllFirewall.isPermissible("1.1.1.1"));
- }
-
- @Test
- public void testIsPermissibleWithEmptyConfigWithMalformedData() {
- assertTrue(acceptAllFirewall.isPermissible("abc"));
- }
-
- private boolean deleteFile(final File file) {
- if (file.isDirectory()) {
- FileUtils.deleteFilesInDir(file, null, null, true, true);
- }
- return FileUtils.deleteFile(file, null, 10);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/flow/impl/DataFlowManagementServiceImplTest.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/flow/impl/DataFlowManagementServiceImplTest.java b/nifi/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/flow/impl/DataFlowManagementServiceImplTest.java
deleted file mode 100644
index f9ba016..0000000
--- a/nifi/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/flow/impl/DataFlowManagementServiceImplTest.java
+++ /dev/null
@@ -1,343 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.cluster.flow.impl;
-
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-
-import java.io.ByteArrayInputStream;
-import java.io.File;
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.Set;
-
-import javax.xml.parsers.DocumentBuilder;
-import javax.xml.parsers.DocumentBuilderFactory;
-import javax.xml.parsers.ParserConfigurationException;
-
-import org.apache.nifi.cluster.flow.DataFlowDao;
-import org.apache.nifi.cluster.flow.PersistedFlowState;
-import org.apache.nifi.cluster.protocol.ClusterManagerProtocolSender;
-import org.apache.nifi.cluster.protocol.NodeIdentifier;
-import org.apache.nifi.cluster.protocol.ProtocolContext;
-import org.apache.nifi.cluster.protocol.ProtocolException;
-import org.apache.nifi.cluster.protocol.ProtocolHandler;
-import org.apache.nifi.cluster.protocol.StandardDataFlow;
-import org.apache.nifi.cluster.protocol.impl.ClusterManagerProtocolSenderImpl;
-import org.apache.nifi.cluster.protocol.impl.SocketProtocolListener;
-import org.apache.nifi.cluster.protocol.jaxb.JaxbProtocolContext;
-import org.apache.nifi.cluster.protocol.jaxb.message.JaxbProtocolUtils;
-import org.apache.nifi.cluster.protocol.message.FlowResponseMessage;
-import org.apache.nifi.cluster.protocol.message.ProtocolMessage;
-import org.apache.nifi.io.socket.ServerSocketConfiguration;
-import org.apache.nifi.io.socket.SocketConfiguration;
-
-import org.apache.commons.io.FileUtils;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.w3c.dom.Document;
-import org.w3c.dom.Element;
-import org.xml.sax.SAXException;
-
-/**
- * @author unattributed
- */
-public class DataFlowManagementServiceImplTest {
-
- private DataFlowManagementServiceImpl service;
- private File restoreLocation;
- private File primaryLocation;
- private DataFlowDao dao;
- private int apiDummyPort;
- private int socketPort;
- private SocketConfiguration socketConfig;
- private ClusterManagerProtocolSender sender;
- private ServerSocketConfiguration serverSocketConfig;
- private SocketProtocolListener listener;
-
- @Before
- public void setup() throws IOException {
-
- primaryLocation = new File(System.getProperty("java.io.tmpdir") + "/primary" + this.getClass().getSimpleName());
- restoreLocation = new File(System.getProperty("java.io.tmpdir") + "/restore" + this.getClass().getSimpleName());
-
- FileUtils.deleteDirectory(primaryLocation);
- FileUtils.deleteDirectory(restoreLocation);
-
- ProtocolContext protocolContext = new JaxbProtocolContext(JaxbProtocolUtils.JAXB_CONTEXT);
-
- socketConfig = new SocketConfiguration();
- socketConfig.setSocketTimeout(1000);
- serverSocketConfig = new ServerSocketConfiguration();
-
- dao = new DataFlowDaoImpl(primaryLocation, restoreLocation, false);
-
- sender = new ClusterManagerProtocolSenderImpl(socketConfig, protocolContext);
-
- service = new DataFlowManagementServiceImpl(dao, sender);
- service.start();
-
- listener = new SocketProtocolListener(1, 0, serverSocketConfig, protocolContext);
- listener.start();
-
- apiDummyPort = 7777;
- socketPort = listener.getPort();
- }
-
- @After
- public void teardown() throws IOException {
-
- if (service != null && service.isRunning()) {
- service.stop();
- }
-
- if (listener != null && listener.isRunning()) {
- try {
- listener.stop();
- } catch (final Exception ex) {
- ex.printStackTrace(System.out);
- }
- }
- FileUtils.deleteDirectory(primaryLocation);
- FileUtils.deleteDirectory(restoreLocation);
-
- }
-
- @Test
- public void testLoadFlowWithNonExistentFlow() throws ParserConfigurationException, SAXException, IOException {
- verifyFlow();
- }
-
- @Test
- public void testLoadFlowWithNonExistentFlowWhenServiceStopped() throws IOException, SAXException, ParserConfigurationException {
- service.stop();
- verifyFlow();
- }
-
- private void verifyFlow() throws ParserConfigurationException, SAXException, IOException {
- final byte[] flowBytes = service.loadDataFlow().getDataFlow().getFlow();
- final DocumentBuilder docBuilder = DocumentBuilderFactory.newInstance().newDocumentBuilder();
- final Document doc = docBuilder.parse(new ByteArrayInputStream(flowBytes));
- final Element controller = (Element) doc.getElementsByTagName("flowController").item(0);
- final Element rootGroup = (Element) controller.getElementsByTagName("rootGroup").item(0);
- final String rootGroupName = rootGroup.getElementsByTagName("name").item(0).getTextContent();
- assertEquals("NiFi Flow", rootGroupName);
- }
-
- @Test
- public void testLoadFlowSingleNode() throws Exception {
- String flowStr = "<rootGroup />";
- byte[] flowBytes = flowStr.getBytes();
- listener.addHandler(new FlowRequestProtocolHandler(new StandardDataFlow(flowStr.getBytes("UTF-8"), new byte[0], new byte[0])));
-
- NodeIdentifier nodeId = new NodeIdentifier("1", "localhost", apiDummyPort, "localhost", socketPort);
- service.setNodeIds(new HashSet<>(Arrays.asList(nodeId)));
- service.setPersistedFlowState(PersistedFlowState.STALE);
-
- assertEquals(PersistedFlowState.STALE, service.getPersistedFlowState());
-
- // sleep long enough for the flow retriever to run
- waitForState(PersistedFlowState.CURRENT);
-
- assertEquals(PersistedFlowState.CURRENT, service.getPersistedFlowState());
- assertArrayEquals(flowBytes, service.loadDataFlow().getDataFlow().getFlow());
-
- }
-
- @Test
- public void testLoadFlowWithSameNodeIds() throws Exception {
-
- String flowStr = "<rootGroup />";
- listener.addHandler(new FlowRequestProtocolHandler(new StandardDataFlow(flowStr.getBytes("UTF-8"), new byte[0], new byte[0])));
-
- NodeIdentifier nodeId1 = new NodeIdentifier("1", "localhost", apiDummyPort, "localhost", socketPort);
- NodeIdentifier nodeId2 = new NodeIdentifier("2", "localhost", apiDummyPort, "localhost", socketPort);
- service.setNodeIds(new HashSet<>(Arrays.asList(nodeId1, nodeId2)));
- service.setPersistedFlowState(PersistedFlowState.STALE);
-
- assertEquals(PersistedFlowState.STALE, service.getPersistedFlowState());
-
- // sleep long enough for the flow retriever to run
- waitForState(PersistedFlowState.CURRENT);
-
- // verify that flow is current
- assertEquals(PersistedFlowState.CURRENT, service.getPersistedFlowState());
-
- // add same ids in different order
- service.setNodeIds(new HashSet<>(Arrays.asList(nodeId2, nodeId1)));
-
- // verify flow is still current
- assertEquals(PersistedFlowState.CURRENT, service.getPersistedFlowState());
-
- }
-
- @Test
- public void testLoadFlowWithABadNode() throws Exception {
-
- String flowStr = "<rootGroup />";
- byte[] flowBytes = flowStr.getBytes();
- listener.addHandler(new FlowRequestProtocolHandler(new StandardDataFlow(flowStr.getBytes("UTF-8"), new byte[0], new byte[0])));
-
- NodeIdentifier nodeId1 = new NodeIdentifier("1", "localhost", apiDummyPort, "localhost", socketPort + 1);
- NodeIdentifier nodeId2 = new NodeIdentifier("2", "localhost", apiDummyPort, "localhost", socketPort);
- service.setNodeIds(new HashSet<>(Arrays.asList(nodeId1, nodeId2)));
- service.setPersistedFlowState(PersistedFlowState.STALE);
-
- assertEquals(PersistedFlowState.STALE, service.getPersistedFlowState());
-
- // sleep long enough for the flow retriever to run
- waitForState(PersistedFlowState.CURRENT);
-
- assertEquals(PersistedFlowState.CURRENT, service.getPersistedFlowState());
- assertArrayEquals(flowBytes, service.loadDataFlow().getDataFlow().getFlow());
-
- }
-
- @Test
- public void testLoadFlowWithConstantNodeIdChanging() throws Exception {
- String flowStr = "<rootGroup />";
- byte[] flowBytes = flowStr.getBytes();
- listener.addHandler(new FlowRequestProtocolHandler(new StandardDataFlow(flowStr.getBytes("UTF-8"), new byte[0], new byte[0])));
-
- NodeIdentifier nodeId1 = new NodeIdentifier("1", "localhost", apiDummyPort, "localhost", socketPort + 1);
- NodeIdentifier nodeId2 = new NodeIdentifier("2", "localhost", apiDummyPort, "localhost", socketPort);
-
- for (int i = 0; i < 1000; i++) {
- service.setNodeIds(new HashSet<>(Arrays.asList(nodeId1, nodeId2)));
- service.setPersistedFlowState(PersistedFlowState.STALE);
- assertEquals(PersistedFlowState.STALE, service.getPersistedFlowState());
- }
-
- // sleep long enough for the flow retriever to run
- waitForState(PersistedFlowState.CURRENT);
-
- assertEquals(PersistedFlowState.CURRENT, service.getPersistedFlowState());
- assertArrayEquals(flowBytes, service.loadDataFlow().getDataFlow().getFlow());
- }
-
- @Test
- public void testLoadFlowWithConstantNodeIdChangingWithRetrievalDelay() throws Exception {
-
- String flowStr = "<rootGroup />";
- listener.addHandler(new FlowRequestProtocolHandler(new StandardDataFlow(flowStr.getBytes("UTF-8"), new byte[0], new byte[0])));
-
- NodeIdentifier nodeId1 = new NodeIdentifier("1", "localhost", apiDummyPort, "localhost", socketPort + 1);
- NodeIdentifier nodeId2 = new NodeIdentifier("2", "localhost", apiDummyPort, "localhost", socketPort);
-
- service.setRetrievalDelay("5 sec");
- for (int i = 0; i < 1000; i++) {
- service.setNodeIds(new HashSet<>(Arrays.asList(nodeId1, nodeId2)));
- service.setPersistedFlowState(PersistedFlowState.STALE);
- assertEquals(PersistedFlowState.STALE, service.getPersistedFlowState());
- }
-
- // sleep long enough for the flow retriever to run
- waitForState(PersistedFlowState.STALE);
-
- assertEquals(PersistedFlowState.STALE, service.getPersistedFlowState());
-
- }
-
- @Test
- public void testStopRequestedWhileRetrieving() throws Exception {
-
- String flowStr = "<rootGroup />";
- listener.addHandler(new FlowRequestProtocolHandler(new StandardDataFlow(flowStr.getBytes("UTF-8"), new byte[0], new byte[0])));
- Set<NodeIdentifier> nodeIds = new HashSet<>();
- for (int i = 0; i < 1000; i++) {
- nodeIds.add(new NodeIdentifier("1", "localhost", apiDummyPort, "localhost", socketPort + 1));
- }
- nodeIds.add(new NodeIdentifier("1", "localhost", apiDummyPort, "localhost", socketPort));
-
- long lastRetrievalTime = service.getLastRetrievalTime();
-
- service.setNodeIds(nodeIds);
- service.setPersistedFlowState(PersistedFlowState.STALE);
- assertEquals(PersistedFlowState.STALE, service.getPersistedFlowState());
-
- // sleep long enough for the flow retriever to run
- waitForState(PersistedFlowState.STALE);
-
- service.stop();
-
- service.setPersistedFlowState(PersistedFlowState.STALE);
- assertEquals(PersistedFlowState.STALE, service.getPersistedFlowState());
-
- assertEquals(lastRetrievalTime, service.getLastRetrievalTime());
-
- }
-
- @Test
- public void testLoadFlowUnknownState() throws Exception {
-
- String flowStr = "<rootGroup />";
- byte[] flowBytes = flowStr.getBytes();
- listener.addHandler(new FlowRequestProtocolHandler(new StandardDataFlow(flowStr.getBytes("UTF-8"), new byte[0], new byte[0])));
- NodeIdentifier nodeId = new NodeIdentifier("1", "localhost", apiDummyPort, "localhost", socketPort);
-
- service.setNodeIds(new HashSet<>(Arrays.asList(nodeId)));
- service.setPersistedFlowState(PersistedFlowState.STALE);
- assertEquals(PersistedFlowState.STALE, service.getPersistedFlowState());
-
- service.setPersistedFlowState(PersistedFlowState.UNKNOWN);
-
- assertEquals(PersistedFlowState.UNKNOWN, service.getPersistedFlowState());
-
- service.setPersistedFlowState(PersistedFlowState.STALE);
- assertEquals(PersistedFlowState.STALE, service.getPersistedFlowState());
-
- // sleep long enough for the flow retriever to run
- waitForState(PersistedFlowState.CURRENT);
-
- assertArrayEquals(flowBytes, service.loadDataFlow().getDataFlow().getFlow());
-
- }
-
- private class FlowRequestProtocolHandler implements ProtocolHandler {
-
- private StandardDataFlow dataFlow;
-
- public FlowRequestProtocolHandler(final StandardDataFlow dataFlow) {
- this.dataFlow = dataFlow;
- }
-
- @Override
- public boolean canHandle(ProtocolMessage msg) {
- return true;
- }
-
- @Override
- public ProtocolMessage handle(ProtocolMessage msg) throws ProtocolException {
- FlowResponseMessage response = new FlowResponseMessage();
- response.setDataFlow(dataFlow);
- return response;
- }
-
- }
-
- private void waitForState(PersistedFlowState state) throws InterruptedException {
- for (int i = 0; i < 30; i++) {
- if (service.getPersistedFlowState() == state) {
- break;
- } else {
- Thread.sleep(1000);
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/manager/impl/HttpRequestReplicatorImplTest.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/manager/impl/HttpRequestReplicatorImplTest.java b/nifi/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/manager/impl/HttpRequestReplicatorImplTest.java
deleted file mode 100644
index 0c65aba..0000000
--- a/nifi/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/manager/impl/HttpRequestReplicatorImplTest.java
+++ /dev/null
@@ -1,368 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.cluster.manager.impl;
-
-import org.apache.nifi.cluster.manager.impl.HttpRequestReplicatorImpl;
-import javax.ws.rs.core.Response;
-import javax.xml.bind.annotation.XmlRootElement;
-import javax.ws.rs.HttpMethod;
-import javax.ws.rs.core.MultivaluedMap;
-import java.net.URISyntaxException;
-import java.util.Arrays;
-import java.util.Iterator;
-import javax.ws.rs.core.StreamingOutput;
-import org.apache.nifi.cluster.manager.testutils.HttpResponse;
-import org.apache.nifi.cluster.manager.testutils.HttpServer;
-import com.sun.jersey.api.client.Client;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.net.URI;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import javax.ws.rs.core.Response.Status;
-import org.apache.nifi.cluster.manager.NodeResponse;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.apache.nifi.cluster.manager.testutils.HttpResponseAction;
-import org.apache.nifi.cluster.protocol.NodeIdentifier;
-import static org.junit.Assert.*;
-
-/**
- * @author unattributed
- */
-public class HttpRequestReplicatorImplTest {
-
- private Client client;
- private HttpRequestReplicatorImpl replicator;
- private int executorThreadCount;
- private int serverThreadCount;
- private int serverPort;
- private HttpServer server;
- private Map<String, List<String>> expectedRequestParameters;
- private Map<String, String> expectedRequestHeaders;
- private Map<String, String> expectedResponseHeaders;
- private Object expectedEntity;
- private String expectedBody;
- private URI prototypeUri;
-
- @Before
- public void setUp() throws IOException, URISyntaxException {
-
- executorThreadCount = 5;
- serverThreadCount = 3;
-
- client = Client.create();
-
- replicator = new HttpRequestReplicatorImpl(executorThreadCount, client, "1 sec", "1 sec");
- replicator.start();
-
- expectedRequestHeaders = new HashMap<>();
- expectedRequestHeaders.put("header1", "header value1");
- expectedRequestHeaders.put("header2", "header value2");
-
- expectedRequestParameters = new HashMap<>();
- expectedRequestParameters.put("param1", Arrays.asList("p value1"));
- expectedRequestParameters.put("param2", Arrays.asList("p value2"));
-
- expectedResponseHeaders = new HashMap<>();
- expectedResponseHeaders.put("header1", "header value1");
- expectedResponseHeaders.put("header2", "header value2");
-
- expectedEntity = new Entity();
-
- expectedBody = "some text";
-
- prototypeUri = new URI("http://prototype.host/path/to/resource");
-
- server = new HttpServer(serverThreadCount, 0);
- server.start();
- serverPort = server.getPort();
- }
-
- @After
- public void teardown() {
- if (server.isRunning()) {
- server.stop();
- }
- if (replicator.isRunning()) {
- replicator.stop();
- }
- }
-
- @Test
- public void testReplicateGetLessNodesThanReplicatorThreads() throws Throwable {
- testReplicateXXX(executorThreadCount - 1, HttpMethod.GET);
- }
-
- @Test
- public void testReplicateGetMoreNodesThanReplicatorThreads() throws Throwable {
- testReplicateXXX(executorThreadCount + 1, HttpMethod.GET);
- }
-
- @Test
- public void testReplicateGetWithUnresponsiveNode() throws Throwable {
-
- // nodes
- Set<NodeIdentifier> nodeIds = createNodes(2, "localhost", serverPort);
-
- // response
- HttpResponse expectedResponse = new HttpResponse(Status.OK, expectedBody);
-
- // first response normal, second response slow
- server.addResponseAction(new HttpResponseAction(expectedResponse));
- server.addResponseAction(new HttpResponseAction(expectedResponse, 3500));
-
- Set<NodeResponse> responses = replicator.replicate(
- nodeIds,
- HttpMethod.GET,
- prototypeUri,
- expectedRequestParameters,
- expectedRequestHeaders);
-
- assertEquals(nodeIds.size(), responses.size());
-
- Iterator<NodeResponse> nodeResponseItr = responses.iterator();
-
- NodeResponse firstResponse = nodeResponseItr.next();
- NodeResponse secondResponse = nodeResponseItr.next();
- NodeResponse goodResponse;
- NodeResponse badResponse;
- if (firstResponse.hasThrowable()) {
- goodResponse = secondResponse;
- badResponse = firstResponse;
- } else {
- goodResponse = firstResponse;
- badResponse = secondResponse;
- }
-
- // good response
- // check status
- assertEquals(Status.OK.getStatusCode(), goodResponse.getStatus());
-
- // check entity stream
- final ByteArrayOutputStream baos = new ByteArrayOutputStream();
- ((StreamingOutput) goodResponse.getResponse().getEntity()).write(baos);
- assertEquals("some text", new String(baos.toByteArray()));
-
- // bad response
- assertTrue(badResponse.hasThrowable());
- assertEquals(Status.INTERNAL_SERVER_ERROR.getStatusCode(), badResponse.getStatus());
-
- }
-
- @Test(expected = IllegalArgumentException.class)
- public void testReplicateGetWithEntity() throws Throwable {
- testReplicateXXXEntity(HttpMethod.GET);
- }
-
- @Test
- public void testReplicatePost() throws Throwable {
- testReplicateXXX(HttpMethod.POST);
- }
-
- @Test
- public void testReplicatePostWithEntity() throws Throwable {
- testReplicateXXXEntity(HttpMethod.POST);
- }
-
- @Test
- public void testReplicatePut() throws Throwable {
- testReplicateXXX(HttpMethod.PUT);
- }
-
- @Test
- public void testReplicatePutWithEntity() throws Throwable {
- testReplicateXXXEntity(HttpMethod.PUT);
- }
-
- @Test
- public void testReplicateDelete() throws Throwable {
- testReplicateXXX(HttpMethod.DELETE);
- }
-
- @Test(expected = IllegalArgumentException.class)
- public void testReplicateDeleteWithEntity() throws Throwable {
- testReplicateXXXEntity(HttpMethod.DELETE);
- }
-
- @Test
- public void testReplicateHead() throws Throwable {
- testReplicateXXX(HttpMethod.HEAD);
- }
-
- @Test(expected = IllegalArgumentException.class)
- public void testReplicateHeadWithEntity() throws Throwable {
- testReplicateXXXEntity(HttpMethod.HEAD);
- }
-
- @Test
- public void testReplicateOptions() throws Throwable {
- testReplicateXXX(HttpMethod.OPTIONS);
- }
-
- @Test(expected = IllegalArgumentException.class)
- public void testReplicateOptionsWithEntity() throws Throwable {
- testReplicateXXXEntity(HttpMethod.OPTIONS);
- }
-
- private void testReplicateXXX(final String method) throws Throwable {
- testReplicateXXX(executorThreadCount, method);
- }
-
- private void testReplicateXXX(final int numNodes, final String method) throws Throwable {
-
- // nodes
- Set<NodeIdentifier> nodeIds = createNodes(numNodes, "localhost", serverPort);
-
- // set up responses
- for (int i = 0; i < nodeIds.size(); i++) {
- HttpResponse response = new HttpResponse(Status.OK, expectedBody);
- response.addHeaders(expectedResponseHeaders);
- server.addResponseAction(new HttpResponseAction(response));
- }
-
- // setup request parameters
- server.addCheckedParameters(expectedRequestParameters);
-
- // request headers
- server.addCheckedHeaders(expectedRequestHeaders);
-
- Set<NodeResponse> responses = replicator.replicate(
- nodeIds,
- method,
- prototypeUri,
- expectedRequestParameters,
- expectedRequestHeaders);
-
- Set<NodeIdentifier> returnedNodeIds = new HashSet<>();
- for (NodeResponse response : responses) {
-
- // check if we received an exception
- if (response.hasThrowable()) {
- throw response.getThrowable();
- }
-
- // gather ids to verify later
- returnedNodeIds.add(response.getNodeId());
-
- // check status
- assertEquals(Status.OK.getStatusCode(), response.getStatus());
-
- Response serverResponse = response.getResponse();
-
- // check response headers are copied
- assertTrue(containsHeaders(expectedResponseHeaders, serverResponse.getMetadata()));
-
- // check entity stream
- if (HttpMethod.HEAD.equalsIgnoreCase(method)) {
- assertNull(serverResponse.getEntity());
- } else {
- assertTrue(isEquals((StreamingOutput) serverResponse.getEntity(), expectedBody));
- }
-
- }
-
- // check node Ids
- assertEquals(nodeIds, returnedNodeIds);
- }
-
- private void testReplicateXXXEntity(final String method) throws Throwable {
- testReplicateXXXEntity(executorThreadCount, method);
- }
-
- private void testReplicateXXXEntity(final int numNodes, final String method) throws Throwable {
-
- // nodes
- Set<NodeIdentifier> nodeIds = createNodes(numNodes, "localhost", serverPort);
-
- // set up responses
- for (int i = 0; i < nodeIds.size(); i++) {
- HttpResponse response = new HttpResponse(Status.OK, expectedBody);
- response.addHeaders(expectedResponseHeaders);
- server.addResponseAction(new HttpResponseAction(response));
- }
-
- // headers
- expectedRequestHeaders.put("Content-Type", "application/xml");
-
- Set<NodeResponse> responses = replicator.replicate(
- nodeIds,
- method,
- prototypeUri,
- expectedEntity,
- expectedRequestHeaders);
-
- Set<NodeIdentifier> returnedNodeIds = new HashSet<>();
- for (NodeResponse response : responses) {
-
- // check if we received an exception
- if (response.hasThrowable()) {
- throw response.getThrowable();
- }
-
- // gather ids to verify later
- returnedNodeIds.add(response.getNodeId());
-
- // check status
- assertEquals(Status.OK.getStatusCode(), response.getStatus());
-
- Response serverResponse = response.getResponse();
-
- // check response headers are copied
- assertTrue(containsHeaders(expectedResponseHeaders, serverResponse.getMetadata()));
-
- // check entity stream
- assertTrue(isEquals((StreamingOutput) serverResponse.getEntity(), expectedBody));
-
- }
-
- // check node Ids
- assertEquals(nodeIds, returnedNodeIds);
- }
-
- private Set<NodeIdentifier> createNodes(int num, String host, int apiPort) {
- Set<NodeIdentifier> result = new HashSet<>();
- for (int i = 0; i < num; i++) {
- result.add(new NodeIdentifier(String.valueOf(i), host, apiPort, host, 1));
- }
- return result;
- }
-
- private boolean isEquals(StreamingOutput so, String expectedText) throws IOException {
- final ByteArrayOutputStream baos = new ByteArrayOutputStream();
- so.write(baos);
- return expectedText.equals(new String(baos.toByteArray()));
- }
-
- private boolean containsHeaders(Map<String, String> expectedHeaders, MultivaluedMap<String, Object> metadata) {
- for (Map.Entry<String, String> expectedEntry : expectedHeaders.entrySet()) {
- if (expectedEntry.getValue().equals(metadata.getFirst(expectedEntry.getKey())) == false) {
- return false;
- }
- }
- return true;
- }
-
-}
-
-@XmlRootElement
-class Entity {
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/manager/impl/HttpResponseMapperImplTest.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/manager/impl/HttpResponseMapperImplTest.java b/nifi/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/manager/impl/HttpResponseMapperImplTest.java
deleted file mode 100644
index d45a4d1..0000000
--- a/nifi/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/manager/impl/HttpResponseMapperImplTest.java
+++ /dev/null
@@ -1,126 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.cluster.manager.impl;
-
-import org.apache.nifi.cluster.manager.impl.HttpResponseMapperImpl;
-import com.sun.jersey.api.client.ClientResponse;
-import com.sun.jersey.core.util.MultivaluedMapImpl;
-import java.io.ByteArrayInputStream;
-import java.util.Map;
-import java.util.HashSet;
-import java.util.Set;
-import java.net.URI;
-import java.net.URISyntaxException;
-import org.apache.nifi.cluster.manager.NodeResponse;
-import org.apache.nifi.cluster.node.Node;
-import org.apache.nifi.cluster.node.Node.Status;
-import org.apache.nifi.cluster.protocol.NodeIdentifier;
-import org.junit.Before;
-import org.junit.Test;
-import static org.junit.Assert.*;
-import static org.mockito.Mockito.*;
-
-/**
- * @author unattributed
- */
-public class HttpResponseMapperImplTest {
-
- private HttpResponseMapperImpl mapper;
-
- private URI dummyUri;
-
- @Before
- public void setup() throws URISyntaxException {
- mapper = new HttpResponseMapperImpl();
- dummyUri = new URI("http://dummy.com");
- }
-
- @Test
- public void testToNodeStatusWithNo2xxResponses() {
-
- Set<NodeResponse> nodeResponses = new HashSet<>();
- nodeResponses.add(createNodeResourceResponse("1", 400));
- nodeResponses.add(createNodeResourceResponse("2", 100));
- nodeResponses.add(createNodeResourceResponse("3", 300));
- nodeResponses.add(createNodeResourceResponse("4", 500));
-
- Map<NodeResponse, Status> map = mapper.map(dummyUri, nodeResponses);
-
- // since no 2xx responses, any 5xx is disconnected
- for (Map.Entry<NodeResponse, Status> entry : map.entrySet()) {
- NodeResponse response = entry.getKey();
- Status status = entry.getValue();
- switch (response.getNodeId().getId()) {
- case "1":
- assertTrue(status == Node.Status.CONNECTED);
- break;
- case "2":
- assertTrue(status == Node.Status.CONNECTED);
- break;
- case "3":
- assertTrue(status == Node.Status.CONNECTED);
- break;
- case "4":
- assertTrue(status == Node.Status.DISCONNECTED);
- break;
- }
- }
- }
-
- @Test
- public void testToNodeStatusWith2xxResponses() {
-
- Set<NodeResponse> nodeResponses = new HashSet<>();
- nodeResponses.add(createNodeResourceResponse("1", 200));
- nodeResponses.add(createNodeResourceResponse("2", 100));
- nodeResponses.add(createNodeResourceResponse("3", 300));
- nodeResponses.add(createNodeResourceResponse("4", 500));
-
- Map<NodeResponse, Status> map = mapper.map(dummyUri, nodeResponses);
-
- // since there were 2xx responses, any non-2xx is disconnected
- for (Map.Entry<NodeResponse, Status> entry : map.entrySet()) {
- NodeResponse response = entry.getKey();
- Status status = entry.getValue();
- switch (response.getNodeId().getId()) {
- case "1":
- assertTrue(status == Node.Status.CONNECTED);
- break;
- case "2":
- assertTrue(status == Node.Status.DISCONNECTED);
- break;
- case "3":
- assertTrue(status == Node.Status.DISCONNECTED);
- break;
- case "4":
- assertTrue(status == Node.Status.DISCONNECTED);
- break;
- }
- }
- }
-
- private NodeResponse createNodeResourceResponse(String nodeId, int statusCode) {
-
- ClientResponse clientResponse = mock(ClientResponse.class);
- when(clientResponse.getStatus()).thenReturn(statusCode);
- when(clientResponse.getHeaders()).thenReturn(new MultivaluedMapImpl());
- when(clientResponse.getEntityInputStream()).thenReturn(new ByteArrayInputStream(new byte[0]));
-
- NodeIdentifier nodeIdentifier = new NodeIdentifier(nodeId, "localhost", 1, "localhost", 1);
- return new NodeResponse(nodeIdentifier, "GET", dummyUri, clientResponse, 1L, "111");
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/manager/impl/TestWebClusterManager.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/manager/impl/TestWebClusterManager.java b/nifi/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/manager/impl/TestWebClusterManager.java
deleted file mode 100644
index 13a192f..0000000
--- a/nifi/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/manager/impl/TestWebClusterManager.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.cluster.manager.impl;
-
-import org.apache.nifi.cluster.manager.impl.WebClusterManager;
-
-import static org.junit.Assert.assertEquals;
-
-import java.text.DateFormat;
-import java.text.ParseException;
-import java.text.SimpleDateFormat;
-import java.util.Date;
-import java.util.Locale;
-
-import org.junit.Test;
-
-public class TestWebClusterManager {
-
- @Test
- public void testNormalizedStatusSnapshotDate() throws ParseException {
- final DateFormat df = new SimpleDateFormat("yyyy/MM/dd HH:mm:SS.SSS", Locale.US);
- final Date date1 = df.parse("2014/01/01 00:00:00.000");
- final Date date2 = df.parse("2014/01/01 00:04:59.999");
- final Date date3 = df.parse("2014/01/01 00:05:00.000");
- final Date date4 = df.parse("2014/01/01 00:05:00.001");
-
- final Date normalized1 = WebClusterManager.normalizeStatusSnapshotDate(date1, 300000);
- assertEquals(date1, normalized1);
-
- final Date normalized2 = WebClusterManager.normalizeStatusSnapshotDate(date2, 300000);
- assertEquals(date1, normalized2);
-
- final Date normalized3 = WebClusterManager.normalizeStatusSnapshotDate(date3, 300000);
- assertEquals(date3, normalized3);
-
- final Date normalized4 = WebClusterManager.normalizeStatusSnapshotDate(date4, 300000);
- assertEquals(date3, normalized4);
- }
-
-}